Reputation: 28435
We are using ForEachAsync method from Nesting await in Parallel.ForEach ,originally suggested by Stephen Toub(at the bottom of his blog post).
public static async Task ForEachAsync<T>(
this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body, Action<Task> handleException = null)
{
if (source.Any())
{
await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
if (t.IsFaulted)
{
handleException?.Invoke(t);
}
});
}));
}
}
But one of our colleagues has a concern about Task.Run overhead that described in Stephen Cleary series of posts https://blog.stephencleary.com/2013/11/taskrun-etiquette-examples-even-in.html
There are (at least) four efficiency problems introduced as soon as you use await with Task.Run in ASP.NET:
• Extra (unnecessary) thread switching to the Task.Run thread pool thread. Similarly, when that thread finishes the request, it has to enter the request context (which is not an actual thread switch but does have overhead).
• Extra (unnecessary) garbage is created. Asynchronous programming is a tradeoff: you get increased responsiveness at the expense of higher memory usage. In this case, you end up creating more garbage for the asynchronous operations that is totally unnecessary. • The ASP.NET thread pool heuristics are thrown off by Task.Run“unexpectedly” borrowing a thread pool thread. I don’t have a lot of experience here, but my gut instinct tells me that the heuristics should recover well if the unexpected task is really short and would not handle it as elegantly if the unexpected task lasts more than two seconds.
• ASP.NET is not able to terminate the request early, i.e., if the client disconnects or the request times out. In the synchronous case, ASP.NET knew the request thread and could abort it. In the asynchronous case, ASP.NET is not aware that other secondary thread pool thread is “for” that request. It is possible to fix this by using cancellation tokens, but that’s outside the scope of this blog post.
My question is it ok to use Task.Run for ForEachAsync or a better way exists to run multiple asynchronous tasks in parallel with controlled dop(degree of parallelism) ? For example, I want to process 400 items, bun running not more than 100 items in parallel.
We are using the ForEachAsync method in both .Net and.Net Core environments, so if the answers for different environments will be different, i will be happy to know for both.
Update to clarify technologies we are using:
We have windows services/consoles ( written in .Net4.6.1) that read thousands of records from DB and then publish them individually in parallel (e.g. dop=100) to web api service( we considered to send them in batches, but haven’t implemented yet).
We also have Asp.Net Core services with background hosting service that regularly(e.g.every 10 sec) pulls page of items (e.g. up to 400) and then in parallel (e.g. dop=100) saves them to individual Azure blobs.
UPDATE: In .NET 6 consider to use new API Parallel.ForEachAsync, a way to schedule asynchronous work that allows you to control the degree of parallelism
Upvotes: 1
Views: 454
Reputation: 117064
You might want to consider using Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive
and add using System.Reactive.Linq;
- then you can do this:
public static async Task ForEachAsync<T>(
this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body)
{
await source
.ToObservable()
.Select(t => Observable.FromAsync(() => body(t)))
.Merge(degreeOfParallelism)
.LastAsync();
}
You'd have to change your error handling, but that's possible.
Upvotes: 0
Reputation: 7091
An easy way to handle 400 messages with a MDOP of 100 in asyncrounous manner would be to use an ActionBlock<T>
. Something like this would work:
public class ActionBlockExample
{
private ActionBlock<int> actionBlock;
public ActionBlockExample()
{
actionBlock = new ActionBlock<int>(x => ProcessMsg(x), new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 100
});
}
public async Task Process()
{
foreach (var msg in Enumerable.Range(0, 400))
{
await actionBlock.SendAsync(msg);
}
actionBlock.Complete();
await actionBlock.Completion;
}
private Task ProcessMsg(int msg) => Task.Delay(100);
}
The ActionBlock
has an unbound input buffer by default and will take all 400 messages processing a maximum of 100 in parallel. There's no need for Task.Run
here as all messages are handled in the background.
Upvotes: 1