user2687506
user2687506

Reputation: 800

How to resolve lists of IAsyncEnumerables as soon as a item is ready

public async IAsyncEnumerable<Entity> FindByIds(List<string> ids)
    {
        List<List<string>> splitIdsList = ids.Split(5);

        var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();

        foreach (var entities in entityList)
        {
            await foreach (var entity in entities)
            {
                yield return entity;
            }
        }
    }

private async IAsyncEnumerable<Entity> FindByIdsQuery(List<string> ids)
    {
        var result = await Connection.QueryAsync(query, new {ids})

        foreach (var entity in result)
        {
            yield return entity;
        }
    }

If I send 25 ids to this function. The first FindByIdsQuery takes 5000ms. The other 4 FindByIdsQuery takes 100ms. Then this solution wont output any Entities, until after 5000ms. Is there any solution that will start outputting entities as soon as there exist anyone to output. Or, if you could do something like in Task, with Task.WhenAny.

To be clear: Any of the 5 Queries can take 5000ms.

Upvotes: 1

Views: 2878

Answers (2)

TomTom
TomTom

Reputation: 62093

The problem is that your code makes them wait. There is NO sense in an async foreach here because - you do not do async.

You do this:

var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();

This is the part of the query that could run async, but it does not because you materialize the whole result set into a list. You then go on async looping over it, but at that point all results already are in memory.

The way to get async is simply to get rid of ToList. Dump the query into foreach, do not materialize it into memory. The async foreach should hit the ef level query (not query result) so you can process information as you get it from the database. ToList efficiently bypasses this.

Also understand that EF can not efficiently handle multiple id lookups. The only possibly way for it to do it is put them into an array and contains, which is a SQL "IN" clause - terribly inefficient for larger numbers as it forces table scan. The efficient SQL way would be to load them into a table valued variable with statistics and use a join, but there is no way to do that in EF - one of the limitations. The SQL limitations of long IN clauses are well documented. The limitations of the EF side not, but they are still there.

Upvotes: 1

Euphoric
Euphoric

Reputation: 12849

From your comments, I understood your problem. What you are basically looking for is some kind of "SelectMany" operator. This operator would start awaiting all of the IAsyncEnumerables and return items in order in which they come, irrespective in what order the source async enumerables are.

I was hoping, that default AsyncEnumerable.SelectMany does this, but I found that not to be true. It goes through the source enumerables and then goes through whole inner enumerable before continuing onto next. So I hacked together SelectMany variant that properly awaits for all inner async enumerables at the same time. Be warned, I do not guarantee correctness, nor safety. There is zero error handling.

/// <summary>
/// Starts all inner IAsyncEnumerable and returns items from all of them in order in which they come.
/// </summary>
public static async IAsyncEnumerable<TItem> SelectManyAsync<TItem>(IEnumerable<IAsyncEnumerable<TItem>> source)
{
    // get enumerators from all inner IAsyncEnumerable
    var enumerators = source.Select(x => x.GetAsyncEnumerator()).ToList();

    List<Task<(IAsyncEnumerator<TItem>, bool)>> runningTasks = new List<Task<(IAsyncEnumerator<TItem>, bool)>>();

    // start all inner IAsyncEnumerable
    foreach (var asyncEnumerator in enumerators)
    {
        runningTasks.Add(MoveNextWrapped(asyncEnumerator));
    }

    // while there are any running tasks
    while (runningTasks.Any())
    {
        // get next finished task and remove it from list
        var finishedTask = await Task.WhenAny(runningTasks);
        runningTasks.Remove(finishedTask);

        // get result from finished IAsyncEnumerable
        var result = await finishedTask;
        var asyncEnumerator = result.Item1;
        var hasItem = result.Item2;

        // if IAsyncEnumerable has item, return it and put it back as running for next item
        if (hasItem)
        {
            yield return asyncEnumerator.Current;

            runningTasks.Add(MoveNextWrapped(asyncEnumerator));
        }
    }

    // don't forget to dispose, should be in finally
    foreach (var asyncEnumerator in enumerators)
    {
        await asyncEnumerator.DisposeAsync();
    }
}

/// <summary>
/// Helper method that returns Task with tuple of IAsyncEnumerable and it's result of MoveNextAsync.
/// </summary>
private static async Task<(IAsyncEnumerator<TItem>, bool)> MoveNextWrapped<TItem>(IAsyncEnumerator<TItem> asyncEnumerator)
{
    var res = await asyncEnumerator.MoveNextAsync();
    return (asyncEnumerator, res);
}

You can then use it to merge all the enumerables instead of the first foreach:

    var entities = SelectManyAsync(splitIdsList.Select(x => FindByIdsQuery(x)));

    return entities;

Upvotes: 5

Related Questions