Karthik Giddu
Karthik Giddu

Reputation: 129

Parallel.ForEach losing data

Parallel.ForEach helps improve performance however, I am seeing data loss.

Tried - variables results, processedData are ConcurrentBag<IwrRows>

1)

Parallel.ForEach(results, () => new ConcurrentBag<IwrRows>(), (n, loopState, localData)    =>
{
 return ProcessData(n); // ProcessData complicated business logic
}, (localData) => AddRows(localData, processedData, obj)
);

2)

await Task.Run(() => Parallel.ForEach(results, item =>
        {
            ProcessData(item, processedData);  
        }));

3)

Parallel.ForEach(results, item =>
 {
 ProcessData(item, processedData);
 });

All of them lost some rows.

When I use the foreach block it returns consistently the same value however, its 4 times slower.

foreach (var item in results)
        {
            // ProcessData returns a List<IwrRows>
            processedData.AddRange(ProcessData(item));
        }

Not sure what I am missing here.

results - 51112 Foreach returns 41316 rows back. ForeachParallel returns 41308 or 41313 or 41314 varies with each run

Upvotes: 10

Views: 4178

Answers (4)

Kacper Werema
Kacper Werema

Reputation: 448

Well there might be definitely problem with your buisness logic (ProcessData).
Maybe not a pararell.foreach but I think this one might speed up a little your code, it's async as well using LINQ.
Thats the way, I'm handling Parallel async operations on some data.
You'll probably need to flatten the result of taskList (its full pseudo-code written from head). You could always use yield return always to materialize later your list, that might fasten it a bit more. But use yield with caution :)

var taskList = results.Select(async item =>
    {
        return await ProcessData(item, processedData);  
    });

await Task.WhenAll(taskList);

Use WhenAll or WaitAll depend on the case you want

Task.WaitAll:

At least one of the Task instances was canceled -or- an exception was thrown during the execution of  
at least one of the Task instances.If a task was canceled, the AggregateException contains an  
OperationCanceledException in its InnerExceptions collection.

Task.WhenAll:

If any of the supplied tasks completes in a faulted state, the returned task will also complete in a   
Faulted state, where its exceptions will contain the aggregation of the set of unwrapped exceptions  
from each of the supplied tasks.

Upvotes: 0

makro88
makro88

Reputation: 102

using await Task.Run in 2) is useless i think.

If Foreach returns 41316 rows back for Results - 51112 than problem is not in Parallel.ForEach but in your adding / processing mechanism. Remember that even if ConcurrentBag guarantees that each operation on it is thread-safe, it doesn't take care of duplicates.

Upvotes: 0

nvoigt
nvoigt

Reputation: 77364

You seem to struggle with the results and getting them back into a coherent list. You could use PLinQ, so you don't have to bother with the results container being thread-safe:

var processedData = yourData.AsParallel().Select(ProcessData).ToList();

Upvotes: 7

Peter
Peter

Reputation: 27944

You problem seems to be in: AddRows(localData, processedData, obj). This method is probably adding data to a list which is not thread safe. You should add the to a thread safe list or make some synchronizing around the adding of the data.

Upvotes: 5

Related Questions