fingers10
fingers10

Reputation: 7947

Parallel.ForEach not adding items as expected in ConcurrentBag in C#

In my Asp.Net Core WebApi Controller, I'm receiving a IFormFile[] files. I need to convert this to of List<DocumentData>. I first used foreach. It was working fine. But later decided to change to Parallel.ForEach as I'm receiving many(>5) files.

Here is my DocumentData Class:

public class DocumentData
{
    public byte[] BinaryData { get; set; }
    public string FileName { get; set; }
}

Here is my Parallel.ForEach Logic:

var documents = new ConcurrentBag<DocumentData>();
Parallel.ForEach(files, async (currentFile) =>
{
    if (currentFile.Length > 0)
    {
        using (var ms = new MemoryStream())
        {
            await currentFile.CopyToAsync(ms);
            documents.Add(new DocumentData
            {
                BinaryData = ms.ToArray(),
                FileName = currentFile.FileName
            });
        }
    }
});

For Example, even for two files as input, documents always gives one file as output. Am I missing something?

I initially had List<DocumentData>. I found that it's not thread safe and changed to ConcurrentBag<DocumentData>. But still I'm getting unexpected results. Please assist on where I'm wrong?

Upvotes: 3

Views: 1745

Answers (2)

Farhad Jabiyev
Farhad Jabiyev

Reputation: 26635

I guess it is because, Parallel.Foreach doesn't support async/await. It only takes Action as input and executes it for each item. And in case of async delegates it will execute them in a fire-and-forget manner. In that case passed lambda will be considered as async void function and async void can't be awaited.

If there were overload which takes Func<Task> then it would work.

I suggest you to create Tasks with the help of Select and use Task.WhenAll for executing them at the same time.

For example:

var tasks = files.Select(async currentFile =>
{
    if (currentFile.Length > 0)
    {
        using (var ms = new MemoryStream())
        {
            await currentFile.CopyToAsync(ms);
            documents.Add(new DocumentData
            {
                BinaryData = ms.ToArray(),
                FileName = currentFile.FileName
            });
        }
    }
});

await Task.WhenAll(tasks);

Additionally you can improve that code with just returning DocumentData instance from that method, and in such case there is no need to modify documents collection. Task.WhenAll has overload which takes IEnumerable<Task<TResult> as input and produces Task of TResult array. So, the result will be so:

var tasks = files.Select(async currentFile =>
    {
        if (currentFile.Length > 0)
        {
            using (var ms = new MemoryStream())
            {
                await currentFile.CopyToAsync(ms);
                return new DocumentData
                {
                    BinaryData = ms.ToArray(),
                    FileName = currentFile.FileName
                };
            }
        }

        return null;
    });

var documents =  (await Task.WhenAll(tasks)).Where(d => d != null).ToArray();

Upvotes: 7

TheGeneral
TheGeneral

Reputation: 81503

You had the right idea with a concurrent collection, but misused a TPL method.

In short you need to be very careful about async lambdas, and if you are passing them to an Action or Func<Task>

Your problem is because Parallel.For / ForEach is not suited for the async and await pattern or IO bound tasks. They are suited for cpu bound workloads. Which means they essentially have Action parameters and let's the task scheduler create the tasks for you

If you want to run mutple tasks at the same time use Task.WhenAll , or a TPL Dataflow ActionBlock which can deal effectively with both CPU bound and IO bound works loads, or said more directly, they can deal with tasks which is what an async method is.

The fundimental issue is when you call an async lambda on an Action, you are essentially creating an async void method, which will run as a task unobserved. That's to say, your TPL method is just creating a bunch of tasks in parallel to run a bunch of unobserved tasks and not waiting for them.

Think of it like this, you ask a bunch of friends to go and get you some groceries, they in turn tell someone else to get your groceries, yet your friends report back to you and say thier job is done. It obviously isn't and you have no groceries.

Upvotes: 3

Related Questions