Reputation: 67195
I have the following method:
private async Task<string> MakeRequestAsync(string id)
{
// ...
}
And I have a list of user-supplied IDs.
I want to call MakeRequestAsync()
once for each ID. But I'd like use as many parallel tasks as is reasonable. I need the result from each call, and I want to detect exceptions.
I've looked at a lot of examples, but don't see how to do this. For example, I found Parallel.ForEach()
, but the body
argument is an Action<>
and does not return any value. Also, I'm not sure what is the maximum number of tasks I should create.
Can anyone show me how you'd do this? Or perhaps provide a link to a good article?
Upvotes: 0
Views: 394
Reputation: 5805
You can easily do it with await Task.WhenAll
, but I often use the convenience methods below. You can use them like this:
(string, ArgumentException)[] results = await idList
.ForEachParallelSafe<string, ArgumentException>(MakeRequestAsync);
The order of results is the same as the order of the source collection.
If you are ok with just the first exception being thrown, you can also just do this:
string[] results = await idList
.ForEachParallel(MakeRequestAsync);
The optional degreeOfParallelism
parameter lets you restrict the maximum number of parallel executions. There is also an overload with an optional elementSelector
parameter in case you need to combine each original input element with the output, for example if you need to create a Dictionary
.
Here is the code (I've included all the overloads for your convenience, you can also just pick the ones that you need):
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the result of executions. If the execution of <paramref name="asyncFunc" /> throws an exception of type
/// <typeparamref name="TException" />, it is caught and returned in the result.
/// </summary>
public static Task<(TOut Result, TException Exception)[]>
ForeachParallelSafe<TIn, TOut, TException>(this IEnumerable<TIn> source,
Func<TIn, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
where TException : Exception
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
async Task<(TOut Result, TException Exception)> safeFunc(TIn input)
{
try
{
return (await asyncFunc(input), null);
}
catch (TException e)
{
return (default, e);
}
}
return ForeachParallel(source, safeFunc, (orig, output) => output, degreeOfParallelism);
}
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the result of executions. The returned items are the result of applying <paramref name="elementSelector" />
/// to each of the original items and the resulting items.
/// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
/// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
/// </summary>
public static async Task<TResult[]> ForeachParallel<T, TOut, TResult>(this IEnumerable<T> source,
Func<T, Task<TOut>> asyncFunc, Func<T, TOut, TResult> elementSelector,
int degreeOfParallelism = -1)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector));
// Copy the source into an array to avoid multiple enumerations.
// Could be optimized to avoid copying in certain cases but this
// is usually negligible compared to the async operation.
T[] sourceCopy = source.ToArray();
SemaphoreSlim semaphore = null;
if (degreeOfParallelism > 0)
{
semaphore = new SemaphoreSlim(degreeOfParallelism, degreeOfParallelism);
}
TOut[] intermediateResults = await Task.WhenAll(sourceCopy
.Select(async x =>
{
if (semaphore != null)
{
await semaphore.WaitAsync();
}
try
{
return await asyncFunc(x);
}
finally
{
semaphore?.Release();
}
}));
TResult[] result = sourceCopy
.Select((x, index) => elementSelector(x, intermediateResults[index]))
.ToArray();
semaphore?.Dispose();
return result;
}
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the end of all executions.
/// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
/// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
/// </summary>
public static Task ForeachParallel<T>(this IEnumerable<T> source,
Func<T, Task> asyncFunc, int degreeOfParallelism = -1)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
Task<int> asyncTask(T t) => asyncFunc(t).ContinueWith(_ => 0);
return ForeachParallel(source, asyncTask, (orig, output) => output, degreeOfParallelism);
}
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the result of executions.
/// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
/// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
/// </summary>
public static Task<TOut[]> ForeachParallel<T, TOut>(this IEnumerable<T> source,
Func<T, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
return ForeachParallel(source, asyncFunc, (orig, output) => output, degreeOfParallelism);
}
Upvotes: 2