问题 Parallel.ForEach丢失数据


Parallel.ForEach有助于提高性能,但我看到数据丢失。

尝试 - 变量结果,processedData是 ConcurrentBag<IwrRows>

1)

Parallel.ForEach(results, () => new ConcurrentBag<IwrRows>(), (n, loopState, localData)    =>
{
 return ProcessData(n); // ProcessData complicated business logic
}, (localData) => AddRows(localData, processedData, obj)
);

2)

await Task.Run(() => Parallel.ForEach(results, item =>
        {
            ProcessData(item, processedData);  
        }));

3)

Parallel.ForEach(results, item =>
 {
 ProcessData(item, processedData);
 });

所有人都失去了一些行。

当我使用foreach块时,它始终返回相同的值,但是它慢了4倍。

foreach (var item in results)
        {
            // ProcessData returns a List<IwrRows>
            processedData.AddRange(ProcessData(item));
        }

不知道我在这里缺少什么。

结果 - 51112 Foreach返回41316行。 ForeachParallel返回41308或41313或41314随每次运行而变化


10952
2018-03-18 13:18


起源

请发一个 最小,完整和可验证的例子 - Daniel A. White
是什么定义 ProcessData()? - Glorin Oakenfoot
我同意makro88的答案中的一点:你至少应该解释ProcessData()函数的通常预期返回值,以便其他人可以更好地帮助你。正如你现在写的问题,似乎这个函数返回一个包含0-N项的已处理数据的集合。你可以编辑问题并澄清这一点吗?项目数量的预期数学不清楚,其他人无法重现。尝试在a中隔离您的问题 MCVE 如前所述,否则这个问题应该被标记和关闭。 - sɐunıɔןɐqɐp
可能重复: 是ConcurrentBag中的Parallel.ForEach <T>线程安全 - sɐunıɔןɐqɐp


答案:


你似乎很难对结果感到困惑,并让它们回到一个连贯的列表中。您可以使用PLinQ,因此您不必担心结果容器是线程安全的:

var processedData = yourData.AsParallel().Select(ProcessData).ToList();

6
2018-03-18 13:33



与foreach循环相比,这将返回相同的数据行。它的速度是Parallel.Foreach的两倍,但速度比foreach快。在这种情况下,Parallel.Foreach和AsParallel有什么区别?为什么即使在使用ConcurrentBag后我丢失数据? - Karthik Giddu


您遇到的问题似乎是:AddRows(localData,processedData,obj)。此方法可能是将数据添加到非线程安全的列表中。您应该添加到线程安全列表或围绕添加数据进行一些同步。


4
2018-03-18 13:26



实际上,OP提到了 processedData 是类型 ConcurrentBag<IwrRows>从理论上讲,它应该处理并发问题。 - sɐunıɔןɐqɐp


运用 await Task.Run 在2)我觉得没用。

如果 Foreach returns 41316 rows back 对于 Results - 51112 比问题还没有 Parallel.ForEach 但在你的添加/处理机制。请记住,即使 ConcurrentBag 保证它上面的每个操作都是线程安全的,它不会重复。


0
2018-03-18 13:36





那么你的商务逻辑(ProcessData)肯定存在问题。
也许不是pararell.foreach但我认为这可能会加快你的代码,使用LINQ也是异步。
就这样,我正在处理一些数据上的并行异步操作。
您可能需要展平taskList的结果(从头部写入的完整伪代码)。你可以随时使用yield return来实现你的列表,这可能会使它更加紧张。但要谨慎使用产量:)

var taskList = results.Select(async item =>
    {
        return await ProcessData(item, processedData);  
    });

await Task.WhenAll(taskList);

使用WhenAll或WaitAll取决于您想要的情况

Task.WaitAll:

At least one of the Task instances was canceled -or- an exception was thrown during the execution of  
at least one of the Task instances.If a task was canceled, the AggregateException contains an  
OperationCanceledException in its InnerExceptions collection.

Task.WhenAll:

If any of the supplied tasks completes in a faulted state, the returned task will also complete in a   
Faulted state, where its exceptions will contain the aggregation of the set of unwrapped exceptions  
from each of the supplied tasks.

0
2017-09-19 13:20