Jaron Johnson
Jaron Johnson

Reputation: 33

Nested async methods in a Parallel.ForEach

I have a method that runs multiple async methods within it. I have to iterate over a list of devices, and pass the device to this method. I am noticing that this is taking a long time to complete so I am thinking of using Parallel.ForEach so it can run this process against multiple devices at the same time.

Let's say this is my method.

public async Task ProcessDevice(Device device) {
    var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);

    var result = await DoSomething(dev);
    await DoSomething2(dev);
}

Then DoSomething2 also calls an async method.

public async Task DoSomething2(Device dev) {
    foreach(var obj in dev.Objects) {
        await DoSomething3(obj);
    }
}

The list of devices continuously gets larger over time, so the more this list grows, the longer it takes the program to finish running ProcessDevice() against each device. I would like to process more than one device at a time. So I have been looking into using Parallel.ForEach.

Parallel.ForEach(devices, async device => {
    try {
        await ProcessDevice(device);
    } catch (Exception ex) {
        throw ex;
    }
})

It appears that the program is finishing before the device is fully processed. I have also tried creating a list of tasks, and then foreach device, add a new task running ProcessDevice to that list and then awaiting Task.WhenAll(listOfTasks);

var listOfTasks = new List<Task>();
foreach(var device in devices) {
    var task = Task.Run(async () => await ProcessDevice(device));
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

But it appears that the task is marked as completed before ProcessDevice() is actually finished running.

Please excuse my ignorance on this issue as I am new to parallel processing and not sure what is going on. What is happening to cause this behavior and is there any documentation that you could provide that could help me better understand what to do?

Upvotes: 2

Views: 769

Answers (4)

Gabriel Luci
Gabriel Luci

Reputation: 40928

I can explain the problem with Parallel.ForEach. An important thing to understand is that when the await keyword acts on an incomplete Task, it returns. It will return its own incomplete Task if the method signature allows (if it's not void). Then it is up to the caller to use that Task object to wait for the job to finish.

But the second parameter in Parallel.ForEach is an Action<T>, which is a void method, which means no Task can be returned, which means the caller (Parallel.ForEach in this case) has no way to wait until the job has finished.

So in your case, as soon as it hits await ProcessDevice(device), it returns and nothing waits for it to finish so it starts the next iteration. By the time Parallel.ForEach is finished, all it has done is started all the tasks, but not waited for them.

So don't use Parallel.ForEach with asynchronous code.

Stephen's answer is more appropriate. You can also use WSC's answer, but that can be dangerous with larger lists. Creating hundreds or thousands of new threads all at once will not help your performance.

Upvotes: 1

Stephen Cleary
Stephen Cleary

Reputation: 456507

You can't mix async with Parallel.ForEach. Since your underlying operation is asynchronous, you'd want to use asynchronous concurrency, not parallelism. Asynchronous concurrency is most easily expressed with WhenAll:

var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);

Upvotes: 7

DevyDev
DevyDev

Reputation: 886

not very sure it this if what you are asking for, but I can give example of how we start async process

 private readonly Func<Worker> _worker;

    private void StartWorkers(IEnumerable<Props> props){
    Parallel.ForEach(props, timestamp => { _worker.Invoke().Consume(timestamp); });
    }

Would recommend reading about Parallel.ForEach as it will do some part for you.

Upvotes: -1

WSC
WSC

Reputation: 992

In your last example there's a few problems:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    await  Task.Run(async () => await ProcessDevice(device));
}
await Task.WhenAll(listOfTasks);

Doing await Task.Run(async () => await ProcessDevice(device)); means you are not moving to the next iteration of the foreach loop until the previous one is done. Essentially, you're still doing them one at a time.

Additionally, you aren't adding any tasks to listOfTasks so it remains empty and therefore Task.WhenAll(listOfTasks) completes instantly because there's no tasks to await.

Try this:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    var task = Task.Run(async () => await ProcessDevice(device))
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

Upvotes: 1

Related Questions