Reputation: 3439
I would like to handle a collection in parallel, but I'm having trouble implementing it and I'm therefore hoping for some help.
The trouble arises if I want to call a method marked async in C#, within the lambda of the parallel loop. For example:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
The problem occurs with the count being 0, because all the threads created are effectively just background threads and the Parallel.ForEach
call doesn't wait for completion. If I remove the async keyword, the method looks like this:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
It works, but it completely disables the await cleverness and I have to do some manual exception handling.. (Removed for brevity).
How can I implement a Parallel.ForEach
loop, that uses the await keyword within the lambda? Is it possible?
The prototype of the Parallel.ForEach method takes an Action<T>
as parameter, but I want it to wait for my asynchronous lambda.
Upvotes: 314
Views: 294682
Reputation: 1
I typically employ two distinct methods for handling parallel tasks, depending on the complexity of the task at hand.
For simpler data types and methods, I use the following approach:
// Initialize a list for processing
List<int> longList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21 };
// Initialize a list to store responses
List<object> responses = new List<object>();
var tasks = longList.Select(async item =>
{
// Perform some pre-processing
var response = await GetData(item);
responses .Add(response);
// Perform some post-processing
});
await Task.WhenAll(tasks);
int count = responses.Count;
This approach straightforward and effective for simpler tasks. However, I often need to run more complex tasks in parallel. Same as Stephen Cleary's
For more complex tasks, I use a library that manages all tasks. This library allows me to dynamically adjust the degree of parallelism and includes a retry function in case an error occurs. Here's how I would use it for this case:
// Create an object to hold all async requests (Task in this library)
RequestContainer<OwnRequest> container = new RequestContainer<OwnRequest>();
// Create an object wrapper for each item and add it to the collection
foreach (var item in longList)
container.Add(new OwnRequest(async (token) => {
var response = await GetData(item);
if (response == null)
return false;
responses.Add(response);
return true;
}));
// Await the completion of all tasks in the collection
await container.Task;
int count = responses.Count;
This library is particularly useful for complex tasks and offers more features than demonstrated in this example. However, it is quite easy to use.
Upvotes: -3
Reputation: 10062
You can use the ParallelForEachAsync
extension method from AsyncEnumerator NuGet Package:
using Dasync.Collections;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.
Upvotes: 158
Reputation: 4829
One of the new .NET 6 APIs is Parallel.ForEachAsync, a way to schedule asynchronous work that allows you to control the degree of parallelism:
var urls = new []
{
"https://dotnet.microsoft.com",
"https://www.microsoft.com",
"https://stackoverflow.com"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
var response = await client.GetAsync(url);
if (response.IsSuccessStatusCode)
{
using var target = File.OpenWrite(targetPath);
await response.Content.CopyToAsync(target);
}
});
Another example in Scott Hanselman's blog.
The source, for reference.
Upvotes: 164
Reputation: 60902
Simplest possible extension method compiled from other answers and the article referenced by the accepted asnwer:
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync();
try
{
await asyncAction(item).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
UPDATE: here's a simple modification that also supports a cancellation token like requested in the comments (untested)
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync(cancellationToken);
if (cancellationToken.IsCancellationRequested) return;
try
{
await asyncAction(item, cancellationToken).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
Upvotes: 17
Reputation: 607
With SemaphoreSlim
you can achieve parallelism control.
var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
await throttler.WaitAsync();
try
{
var response = await GetData(item);
bag.Add(response);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
var count = bag.Count;
Upvotes: 39
Reputation: 879
For a more simple solution (not sure if the most optimal), you can simply nest Parallel.ForEach
inside a Task
- as such
var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
Parallel.ForEach(myCollection, options, item =>
{
DoWork(item);
}
}
The ParallelOptions
will do the throttlering for you, out of the box.
I am using it in a real world scenario to run a very long operations in the background. These operations are called via HTTP and it was designed not to block the HTTP call while the long operation is running.
That way, the CI/CD call does not timeout because of long HTTP operation, rather it loops the status every x seconds without blocking the process
Upvotes: -2
Reputation: 176
The following is set to work with IAsyncEnumerable
but can be modified to use IEnumerable
by just changing the type and removing the "await" on the foreach
. It's far more appropriate for large sets of data than creating countless parallel tasks and then awaiting them all.
public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
{
ActionBlock<T> block = new ActionBlock<T>(
action,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3
});
await foreach (T item in enumerable)
{
await block.SendAsync(item).ConfigureAwait(false);
}
block.Complete();
await block.Completion;
}
Upvotes: 0
Reputation: 1052
In the accepted answer the ConcurrentBag is not required. Here's an implementation without it:
var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);
Any of the "// some pre stuff" and "// some post stuff" can go into the GetData implementation (or another method that calls GetData)
Aside from being shorter, there's no use of an "async void" lambda, which is an anti pattern.
Upvotes: 0
Reputation: 1052
My lightweight implementation of ParallelForEach async.
Features:
public static class AsyncEx
{
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
{
var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
var tcs = new TaskCompletionSource<object>();
var exceptions = new ConcurrentBag<Exception>();
bool addingCompleted = false;
foreach (T item in source)
{
await semaphoreSlim.WaitAsync();
asyncAction(item).ContinueWith(t =>
{
semaphoreSlim.Release();
if (t.Exception != null)
{
exceptions.Add(t.Exception);
}
if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
{
tcs.TrySetResult(null);
}
});
}
Volatile.Write(ref addingCompleted, true);
await tcs.Task;
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
}
}
Usage example:
await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
Upvotes: 5
Reputation: 457382
If you just want simple parallelism, you can do this:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
If you need something more complex, check out Stephen Toub's ForEachAsync
post.
Upvotes: 352
Reputation: 3771
I've created an extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Sample Usage:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
Upvotes: 1