clausndk
clausndk

Reputation: 3439

Parallel foreach with asynchronous lambda

I would like to handle a collection in parallel, but I'm having trouble implementing it and I'm therefore hoping for some help.

The trouble arises if I want to call a method marked async in C#, within the lambda of the parallel loop. For example:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

The problem occurs with the count being 0, because all the threads created are effectively just background threads and the Parallel.ForEach call doesn't wait for completion. If I remove the async keyword, the method looks like this:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

It works, but it completely disables the await cleverness and I have to do some manual exception handling.. (Removed for brevity).

How can I implement a Parallel.ForEach loop, that uses the await keyword within the lambda? Is it possible?

The prototype of the Parallel.ForEach method takes an Action<T> as parameter, but I want it to wait for my asynchronous lambda.

Upvotes: 314

Views: 294682

Answers (11)

YI YONG
YI YONG

Reputation: 1

I typically employ two distinct methods for handling parallel tasks, depending on the complexity of the task at hand.

For simpler data types and methods, I use the following approach:

// Initialize a list for processing
List<int> longList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21 };

// Initialize a list to store responses
List<object> responses = new List<object>();
var tasks = longList.Select(async item =>
{
    // Perform some pre-processing
    var response = await GetData(item);
    responses .Add(response);
    // Perform some post-processing
});

await Task.WhenAll(tasks);
int count = responses.Count;

This approach straightforward and effective for simpler tasks. However, I often need to run more complex tasks in parallel. Same as Stephen Cleary's

For more complex tasks, I use a library that manages all tasks. This library allows me to dynamically adjust the degree of parallelism and includes a retry function in case an error occurs. Here's how I would use it for this case:

// Create an object to hold all async requests (Task in this library)
RequestContainer<OwnRequest> container = new RequestContainer<OwnRequest>();

// Create an object wrapper for each item and add it to the collection
foreach (var item in longList)
    container.Add(new OwnRequest(async (token) => {
        var response = await GetData(item);
        if (response == null)
            return false;
        responses.Add(response);
        return true;
    }));

// Await the completion of all tasks in the collection
await container.Task;
int count = responses.Count;

This library is particularly useful for complex tasks and offers more features than demonstrated in this example. However, it is quite easy to use.

Upvotes: -3

Serge Semenov
Serge Semenov

Reputation: 10062

You can use the ParallelForEachAsync extension method from AsyncEnumerator NuGet Package:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.

Upvotes: 158

Majid Shahabfar
Majid Shahabfar

Reputation: 4829

One of the new .NET 6 APIs is Parallel.ForEachAsync, a way to schedule asynchronous work that allows you to control the degree of parallelism:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

Another example in Scott Hanselman's blog.

The source, for reference.

Upvotes: 164

Alex from Jitbit
Alex from Jitbit

Reputation: 60902

Simplest possible extension method compiled from other answers and the article referenced by the accepted asnwer:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

UPDATE: here's a simple modification that also supports a cancellation token like requested in the comments (untested)

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

Upvotes: 17

Ganso Doido
Ganso Doido

Reputation: 607

With SemaphoreSlim you can achieve parallelism control.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;

Upvotes: 39

Gravity API
Gravity API

Reputation: 879

For a more simple solution (not sure if the most optimal), you can simply nest Parallel.ForEach inside a Task - as such

var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
    Parallel.ForEach(myCollection, options, item =>
    {
        DoWork(item);
    }
}

The ParallelOptions will do the throttlering for you, out of the box.

I am using it in a real world scenario to run a very long operations in the background. These operations are called via HTTP and it was designed not to block the HTTP call while the long operation is running.

  1. Calling HTTP for long background operation.
  2. Operation starts at the background.
  3. User gets status ID which can be used to check the status using another HTTP call.
  4. The background operation update its status.

That way, the CI/CD call does not timeout because of long HTTP operation, rather it loops the status every x seconds without blocking the process

Upvotes: -2

Caleb Holt
Caleb Holt

Reputation: 176

The following is set to work with IAsyncEnumerable but can be modified to use IEnumerable by just changing the type and removing the "await" on the foreach. It's far more appropriate for large sets of data than creating countless parallel tasks and then awaiting them all.

    public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
    {
        ActionBlock<T> block = new ActionBlock<T>(
           action, 
           new ExecutionDataflowBlockOptions 
           { 
             MaxDegreeOfParallelism = maxDegreeOfParallelism, 
             BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3 
           });

        await foreach (T item in enumerable)
        {
           await block.SendAsync(item).ConfigureAwait(false);
        }

        block.Complete();
        await block.Completion;
    }

Upvotes: 0

Tom
Tom

Reputation: 1052

In the accepted answer the ConcurrentBag is not required. Here's an implementation without it:

var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);

Any of the "// some pre stuff" and "// some post stuff" can go into the GetData implementation (or another method that calls GetData)

Aside from being shorter, there's no use of an "async void" lambda, which is an anti pattern.

Upvotes: 0

nicolas2008
nicolas2008

Reputation: 1052

My lightweight implementation of ParallelForEach async.

Features:

  1. Throttling (max degree of parallelism).
  2. Exception handling (aggregation exception will be thrown at completion).
  3. Memory efficient (no need to store the list of tasks).

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

Usage example:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);

Upvotes: 5

Stephen Cleary
Stephen Cleary

Reputation: 457382

If you just want simple parallelism, you can do this:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

If you need something more complex, check out Stephen Toub's ForEachAsync post.

Upvotes: 352

Jay Shah
Jay Shah

Reputation: 3771

I've created an extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

Upvotes: 1

Related Questions