Kind Contributor
Kind Contributor

Reputation: 18533

Doesn't await when using ForEachAsync with await inside Action

The following should return "C", but it returns "B"

using System.Data.Entity;
//...
var state = "A";
var qry = (from f in db.myTable select f);
await qry.ForEachAsync(async (myRecord) => {
   await DoStuffAsync(myRecord);
   state = "B";
});
state = "C";
return state;

It doesn't wait for DoStuffAsync to complete, state="C" runs through and then later state="B" executes (because inside it is still awaiting).

Upvotes: 9

Views: 17481

Answers (3)

A. Jesus Flores
A. Jesus Flores

Reputation: 211

.Net 6 Introduced Parallel.ForEachAsync(...):

var state = "A";
var qry = (from f in db.myTable select f);
await Parallel.ForEachAsync(qry, async (myRecord, token) =>
{
    await DoStuffAsync(myRecord);
    state = "B";
});
state = "C";
return state;

You can set a MaxDegreeOfParallelism property with a ParallelOptions parameter, but usually you don't need it.

From Microsoft Docs: https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.paralleloptions.maxdegreeofparallelism?view=net-6.0

By default, For and ForEach will utilize however many threads the underlying scheduler provides, so changing MaxDegreeOfParallelism from the default only limits how many concurrent tasks will be used.

Generally, you do not need to modify this setting....

Upvotes: 1

Rafi Henig
Rafi Henig

Reputation: 6424

Since DbSet implements IAsyncEnumerable, consider using the following extension method:

public async static Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default)
{
    if (source == null) return;
    await foreach (T item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
    {
        await action(item);
    }
}

Usage:

var qry = (from f in db.myTable select f);
await qry
     .AsAsyncEnumerable()
     .ForEachAsync(async arg =>
     {
         await DoStuffAsync(arg);
     });

Upvotes: 0

Kind Contributor
Kind Contributor

Reputation: 18533

That's because the implementation of ForEachAsync doesn't await the delegated action

moveNextTask = enumerator.MoveNextAsync(cancellationToken);
action(current);

see https://github.com/mono/entityframework/blob/master/src/EntityFramework/Infrastructure/IDbAsyncEnumerableExtensions.cs#L19

But that is because, you can't await an action, the delegate needs to be a Func which returns a Task - see How do you implement an async action delegate method?

Therefore, until Microsoft provides a signature which includes a Func delegate and calls it with await, you'll have to roll your own extension method. I'm using the following at the moment.

public static async Task ForEachAsync<T>(
    this IQueryable<T> enumerable, Func<T, Task> action, CancellationToken cancellationToken) //Now with Func returning Task
{
    var asyncEnumerable = (IDbAsyncEnumerable<T>)enumerable;
    using (var enumerator = asyncEnumerable.GetAsyncEnumerator())
    {

        if (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false))
        {
            Task<bool> moveNextTask;
            do
            {
                var current = enumerator.Current;
                moveNextTask = enumerator.MoveNextAsync(cancellationToken);
                await action(current); //now with await
            }
            while (await moveNextTask.ConfigureAwait(continueOnCapturedContext: false));
        }
    }
}

With this, the original test code in your OP will work as expected.

Upvotes: 14

Related Questions