Reputation: 129
Parallel.ForEach helps improve performance however, I am seeing data loss.
Tried - variables results, processedData are ConcurrentBag<IwrRows>
1)
Parallel.ForEach(results, () => new ConcurrentBag<IwrRows>(), (n, loopState, localData) =>
{
return ProcessData(n); // ProcessData complicated business logic
}, (localData) => AddRows(localData, processedData, obj)
);
2)
await Task.Run(() => Parallel.ForEach(results, item =>
{
ProcessData(item, processedData);
}));
3)
Parallel.ForEach(results, item =>
{
ProcessData(item, processedData);
});
All of them lost some rows.
When I use the foreach block it returns consistently the same value however, its 4 times slower.
foreach (var item in results)
{
// ProcessData returns a List<IwrRows>
processedData.AddRange(ProcessData(item));
}
Not sure what I am missing here.
results - 51112 Foreach returns 41316 rows back. ForeachParallel returns 41308 or 41313 or 41314 varies with each run
Upvotes: 10
Views: 4178
Reputation: 448
Well there might be definitely problem with your buisness logic (ProcessData).
Maybe not a pararell.foreach but I think this one might speed up a little your code, it's async as well using LINQ.
Thats the way, I'm handling Parallel async operations on some data.
You'll probably need to flatten the result of taskList (its full pseudo-code written from head). You could always use yield return always to materialize later your list, that might fasten it a bit more. But use yield with caution :)
var taskList = results.Select(async item =>
{
return await ProcessData(item, processedData);
});
await Task.WhenAll(taskList);
Use WhenAll or WaitAll depend on the case you want
Task.WaitAll:
At least one of the Task instances was canceled -or- an exception was thrown during the execution of
at least one of the Task instances.If a task was canceled, the AggregateException contains an
OperationCanceledException in its InnerExceptions collection.
Task.WhenAll:
If any of the supplied tasks completes in a faulted state, the returned task will also complete in a
Faulted state, where its exceptions will contain the aggregation of the set of unwrapped exceptions
from each of the supplied tasks.
Upvotes: 0
Reputation: 102
using await Task.Run
in 2) is useless i think.
If Foreach returns 41316 rows back
for Results - 51112
than problem is not in Parallel.ForEach
but in your adding / processing mechanism. Remember that even if ConcurrentBag
guarantees that each operation on it is thread-safe, it doesn't take care of duplicates.
Upvotes: 0
Reputation: 77364
You seem to struggle with the results and getting them back into a coherent list. You could use PLinQ, so you don't have to bother with the results container being thread-safe:
var processedData = yourData.AsParallel().Select(ProcessData).ToList();
Upvotes: 7
Reputation: 27944
You problem seems to be in: AddRows(localData, processedData, obj). This method is probably adding data to a list which is not thread safe. You should add the to a thread safe list or make some synchronizing around the adding of the data.
Upvotes: 5