Michael Freidgeim
Michael Freidgeim

Reputation: 28435

Is it ok to use Task.Run in ForEachAsync?

We are using ForEachAsync method from Nesting await in Parallel.ForEach ,originally suggested by  Stephen Toub(at the bottom of his blog post).

public static async Task ForEachAsync<T>(
        this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body, Action<Task> handleException = null)
    {
        if (source.Any())
        {
            await Task.WhenAll(
                from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
                select Task.Run(async delegate
                {
                    using (partition)
                        while (partition.MoveNext())
                            await body(partition.Current).ContinueWith(t =>
                            {
                                //observe exceptions
                                if (t.IsFaulted)
                                {
                                    handleException?.Invoke(t);
                                }
                            });
                }));
        }
    }

But one of our colleagues has a concern about Task.Run overhead that described in Stephen Cleary series of posts https://blog.stephencleary.com/2013/11/taskrun-etiquette-examples-even-in.html

There are (at least) four efficiency problems introduced as soon as you use await with Task.Run in ASP.NET:
• Extra (unnecessary) thread switching to the Task.Run thread pool thread. Similarly, when that thread finishes the request, it has to enter the request context (which is not an actual thread switch but does have overhead).
• Extra (unnecessary) garbage is created. Asynchronous programming is a tradeoff: you get increased responsiveness at the expense of higher memory usage. In this case, you end up creating more garbage for the asynchronous operations that is totally unnecessary. • The ASP.NET thread pool heuristics are thrown off by Task.Run“unexpectedly” borrowing a thread pool thread. I don’t have a lot of experience here, but my gut instinct tells me that the heuristics should recover well if the unexpected task is really short and would not handle it as elegantly if the unexpected task lasts more than two seconds.
• ASP.NET is not able to terminate the request early, i.e., if the client disconnects or the request times out. In the synchronous case, ASP.NET knew the request thread and could abort it. In the asynchronous case, ASP.NET is not aware that other secondary thread pool thread is “for” that request. It is possible to fix this by using cancellation tokens, but that’s outside the scope of this blog post.

My question is it ok to use Task.Run for ForEachAsync or a better way exists to run multiple asynchronous tasks in parallel with controlled dop(degree of parallelism) ? For example, I want to process 400 items, bun running not more than 100 items in parallel.

We are using the ForEachAsync method in both .Net and.Net Core environments, so if the answers for different environments will be different, i will be happy to know for both.

Update to clarify technologies we are using:
We have windows services/consoles ( written in .Net4.6.1) that read thousands of records from DB and then publish them individually in parallel (e.g. dop=100) to web api service( we considered to send them in batches, but haven’t implemented yet).
We also have Asp.Net Core services with background hosting service that regularly(e.g.every 10 sec) pulls page of items (e.g. up to 400) and then in parallel (e.g. dop=100) saves them to individual Azure blobs.

UPDATE: In .NET 6 consider to use new API  Parallel.ForEachAsync, a way to schedule asynchronous work that allows you to control the degree of parallelism

Upvotes: 1

Views: 454

Answers (2)

Enigmativity
Enigmativity

Reputation: 117064

You might want to consider using Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

public static async Task ForEachAsync<T>(
    this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body)
{
    await source
        .ToObservable()
        .Select(t => Observable.FromAsync(() => body(t)))
        .Merge(degreeOfParallelism)
        .LastAsync();
}

You'd have to change your error handling, but that's possible.

Upvotes: 0

JSteward
JSteward

Reputation: 7091

An easy way to handle 400 messages with a MDOP of 100 in asyncrounous manner would be to use an ActionBlock<T>. Something like this would work:

public class ActionBlockExample
{
    private ActionBlock<int> actionBlock;

    public ActionBlockExample()
    {
        actionBlock = new ActionBlock<int>(x => ProcessMsg(x), new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 100
        });
    }

    public async Task Process()
    {
        foreach (var msg in Enumerable.Range(0, 400))
        {
            await actionBlock.SendAsync(msg);
        }
        actionBlock.Complete();
        await actionBlock.Completion;
    }

    private Task ProcessMsg(int msg) => Task.Delay(100);
}

The ActionBlock has an unbound input buffer by default and will take all 400 messages processing a maximum of 100 in parallel. There's no need for Task.Run here as all messages are handled in the background.

Upvotes: 1

Related Questions