Reputation: 515
The first function is designed to enable linq to execute lambda functions safely in parallel (even the async void ones).
So you can do collection.AsParallel().ForAllASync(async x => await x.Action).
The second function is designed to enable you to combine and execute multiple IAsyncEnumerables in parallel and return their results as quick as possible.
I have the following code:
public static async Task ForAllAsync<TSource>(
this ParallelQuery<TSource> source,
Func<TSource, Task> selector,
int? maxDegreeOfParallelism = null)
{
int maxAsyncThreadCount = maxDegreeOfParallelism ?? Math.Min(System.Environment.ProcessorCount, 128);
using SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);
IEnumerable<Task> tasks = source.Select(async input =>
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
await selector(input).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks).ConfigureAwait(true);
}
public static async IAsyncEnumerable<T> ForAllAsync<TSource, T>(
this ParallelQuery<TSource> source,
Func<TSource, IAsyncEnumerable<T>> selector,
int? maxDegreeOfParallelism = null,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
where T : new()
{
IEnumerable<(IAsyncEnumerator<T>, bool)> enumerators =
source.Select(x => (selector.Invoke(x).GetAsyncEnumerator(cancellationToken), true)).ToList();
while (enumerators.Any())
{
await enumerators.AsParallel()
.ForAllAsync(async e => e.Item2 = (await e.Item1.MoveNextAsync()), maxDegreeOfParallelism)
.ConfigureAwait(false);
foreach (var enumerator in enumerators)
{
yield return enumerator.Item1.Current;
}
enumerators = enumerators.Where(e => e.Item2);
}
}
The code somehow continues to return results after the iterators have reached the end.
I am using these functions to combine several threads of IAsyncEnumerable functions that call on API endpoints excepting results of the same type.
Why?
Upvotes: 0
Views: 1210
Reputation: 43474
The type (IAsyncEnumerator<T>, bool)
is a shorthand of the ValueTuple<IAsyncEnumerator<T>, bool>
type, which is a value type. This means that on assignement it's not passed by reference, and instead it's copied. So this lambda does not work as intended:
async e => e.Item2 = (await e.Item1.MoveNextAsync())
Instead of changing the bool
part of the entry stored in the list, it changes the value of a temporary copy, so the change is not preserved.
To make it work as intended you must either switch to reference type tuples (Tuple<IAsyncEnumerator<T>, bool>
), or replace the whole entry in the list:
List<(IAsyncEnumerator<T>, bool)> enumerators = source./*...*/.ToList()
//...
var entry = enumerators[index];
enumerators[index] = (entry.Item1, await entry.Item1.MoveNextAsync());
Be aware that the List<T>
class is not thread-safe, so in order to update it safely from multiple threads concurrently you must protect it with a lock
.
Upvotes: 1