Eduard G
Eduard G

Reputation: 515

Combining IAsyncEnumerator and executing them asynchronously

The first function is designed to enable linq to execute lambda functions safely in parallel (even the async void ones).

So you can do collection.AsParallel().ForAllASync(async x => await x.Action).

The second function is designed to enable you to combine and execute multiple IAsyncEnumerables in parallel and return their results as quick as possible.

I have the following code:

    public static async Task ForAllAsync<TSource>(
        this ParallelQuery<TSource> source, 
        Func<TSource, Task> selector,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? Math.Min(System.Environment.ProcessorCount, 128);
        using SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = source.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            
            try
            {
                await selector(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        await Task.WhenAll(tasks).ConfigureAwait(true);
    }

    public static async IAsyncEnumerable<T> ForAllAsync<TSource, T>(
        this ParallelQuery<TSource> source,
        Func<TSource, IAsyncEnumerable<T>> selector,
        int? maxDegreeOfParallelism = null,
        [EnumeratorCancellation]CancellationToken cancellationToken = default) 
        where T : new()
    {
        IEnumerable<(IAsyncEnumerator<T>, bool)> enumerators = 
            source.Select(x => (selector.Invoke(x).GetAsyncEnumerator(cancellationToken), true)).ToList();

        while (enumerators.Any())
        {
            await enumerators.AsParallel()
                .ForAllAsync(async e => e.Item2 = (await e.Item1.MoveNextAsync()), maxDegreeOfParallelism)
                .ConfigureAwait(false);
            foreach (var enumerator in enumerators)
            {
                yield return enumerator.Item1.Current;
            }
            enumerators = enumerators.Where(e => e.Item2);
        }
    }

The code somehow continues to return results after the iterators have reached the end.

I am using these functions to combine several threads of IAsyncEnumerable functions that call on API endpoints excepting results of the same type.

Why?

Upvotes: 0

Views: 1210

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43474

The type (IAsyncEnumerator<T>, bool) is a shorthand of the ValueTuple<IAsyncEnumerator<T>, bool> type, which is a value type. This means that on assignement it's not passed by reference, and instead it's copied. So this lambda does not work as intended:

async e => e.Item2 = (await e.Item1.MoveNextAsync())

Instead of changing the bool part of the entry stored in the list, it changes the value of a temporary copy, so the change is not preserved.

To make it work as intended you must either switch to reference type tuples (Tuple<IAsyncEnumerator<T>, bool>), or replace the whole entry in the list:

List<(IAsyncEnumerator<T>, bool)> enumerators = source./*...*/.ToList()
//...
var entry = enumerators[index];
enumerators[index] = (entry.Item1, await entry.Item1.MoveNextAsync());

Be aware that the List<T> class is not thread-safe, so in order to update it safely from multiple threads concurrently you must protect it with a lock.

Upvotes: 1

Related Questions