Shaul Behr
Shaul Behr

Reputation: 38003

How to await all results from an IAsyncEnumerable<>?

I'm tinkering around with the new IAsyncEnumerable<T> stuff in C# 8.0. Let's say I've got some method somewhere that I want to consume:

public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...) { ... }

I'm aware that I can use it with the await foreach... syntax. But let's say my consumer needs to have all results from this function before it continues. What's the best syntax to await all results before continuing? In other words, I'd like to be able to do something like:

// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync(); 

What's the correct way to do this?

Upvotes: 27

Views: 21876

Answers (2)

Panagiotis Kanavos
Panagiotis Kanavos

Reputation: 131334

A warning first: by definition, an async stream may never end and keep producing results until the application terminates. This is already used e.g. in SignalR or gRPC. Polling loops also work this way.

As such, using ToListAsync on an async stream may have unintended consequences!


Operators like this are already available through the System.Linq.Async package.

Consuming the entire stream is available through ToListAsync. The code is deceptively simple, but hides a few interesting issues :

public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource>? source, CancellationToken cancellationToken = default)
{
    if (source is null)
        throw Error.ArgumentNull(nameof(source));

    if (source is IAsyncIListProvider<TSource> listProvider)
        return listProvider.ToListAsync(cancellationToken);

    return Core(source, cancellationToken);

    static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    {
        var list = new List<TSource>();

        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            list.Add(item);
        }

        return list;
    }
}

First of all, it returns a ValueTask. Second, it ensures cancellation is observed and ConfigureAwait(false) is used, to prevent deadlocks. Finally, if the source already offers its own ToListAsync implementation via IAsyncIListProvider, the operator defers to that.

It's also interesting to note that while the IAsyncIListProvider interface is public, it's only implemented by internal and private classes within System.Linq.Async.

Upvotes: 34

Shaul Behr
Shaul Behr

Reputation: 38003

Based on @DmitryBychenko's comment, I wrote an extension to do want I want:

    public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
    {
        if (null == asyncEnumerable)
            throw new ArgumentNullException(nameof(asyncEnumerable));  

        var list = new List<T>();
        await foreach (var t in asyncEnumerable)
        {
            list.Add(t);
        }

        return list;
    }

I'm just a little surprised this wasn't shipped natively with C# 8.0...it seems like a pretty obvious need.

Upvotes: 4

Related Questions