Parallel.ForEach有助于提高性能,但我看到数据丢失。
尝试 - 变量结果,processedData是 ConcurrentBag<IwrRows>
1)
Parallel.ForEach(results, () => new ConcurrentBag<IwrRows>(), (n, loopState, localData) =>
{
return ProcessData(n); // ProcessData complicated business logic
}, (localData) => AddRows(localData, processedData, obj)
);
2)
await Task.Run(() => Parallel.ForEach(results, item =>
{
ProcessData(item, processedData);
}));
3)
Parallel.ForEach(results, item =>
{
ProcessData(item, processedData);
});
所有人都失去了一些行。
当我使用foreach块时,它始终返回相同的值,但是它慢了4倍。
foreach (var item in results)
{
// ProcessData returns a List<IwrRows>
processedData.AddRange(ProcessData(item));
}
不知道我在这里缺少什么。
结果 - 51112
Foreach返回41316行。
ForeachParallel返回41308或41313或41314随每次运行而变化
你似乎很难对结果感到困惑,并让它们回到一个连贯的列表中。您可以使用PLinQ,因此您不必担心结果容器是线程安全的:
var processedData = yourData.AsParallel().Select(ProcessData).ToList();
您遇到的问题似乎是:AddRows(localData,processedData,obj)。此方法可能是将数据添加到非线程安全的列表中。您应该添加到线程安全列表或围绕添加数据进行一些同步。
运用 await Task.Run
在2)我觉得没用。
如果 Foreach returns 41316 rows back
对于 Results - 51112
比问题还没有 Parallel.ForEach
但在你的添加/处理机制。请记住,即使 ConcurrentBag
保证它上面的每个操作都是线程安全的,它不会重复。
那么你的商务逻辑(ProcessData)肯定存在问题。
也许不是pararell.foreach但我认为这可能会加快你的代码,使用LINQ也是异步。
就这样,我正在处理一些数据上的并行异步操作。
您可能需要展平taskList的结果(从头部写入的完整伪代码)。你可以随时使用yield return来实现你的列表,这可能会使它更加紧张。但要谨慎使用产量:)
var taskList = results.Select(async item =>
{
return await ProcessData(item, processedData);
});
await Task.WhenAll(taskList);
使用WhenAll或WaitAll取决于您想要的情况
Task.WaitAll:
At least one of the Task instances was canceled -or- an exception was thrown during the execution of
at least one of the Task instances.If a task was canceled, the AggregateException contains an
OperationCanceledException in its InnerExceptions collection.
Task.WhenAll:
If any of the supplied tasks completes in a faulted state, the returned task will also complete in a
Faulted state, where its exceptions will contain the aggregation of the set of unwrapped exceptions
from each of the supplied tasks.