William Jockusch
William Jockusch

Reputation: 27295

Factory for IAsyncEnumerable or IAsyncEnumerator

I'm wondering if there is a way to create either IAsyncEnumerable<T> or IAsyncEnumerator<T> via a Source object, rather like TaskCompletionSource allows one to do for tasks. In particular, TaskCompletionSource can be passed around like any other parameter.

Maybe something like this:

public class AsyncEnumerables {

    public Task HandlerTask { get; set; }

    public async Task<string> ParentMethod() {
        var source = new AsyncEnumerableSource<int>();
        IAsyncEnumerable asyncEnumerable = source.GetAsyncEnumerable();
        HandlerTask = Task.Run(() => handleAsyncResultsAsTheyHappen(asyncEnumerable));
        int n = await someOtherTask();
        source.YieldReturn(n);
        var r = await ChildMethod(source);
        source.Complete();  // this call would cause the HandlerTask to complete.
        return r;
    }

    private async Task<string> ChildMethod(AsyncEnumerableSource<int> source) {
        source.YieldReturn(5);
        await SomeOtherCall();
        source.YieldReturn(10);
        return "hello";
    }
}

With the above code, the handleAsyncResultsAsTheyHappen task would see whatever values got passed into YieldReturn. So it would see the n from the above code, as well as the 5 and the 10 from ChildMethod.

Upvotes: 7

Views: 4248

Answers (3)

Theodor Zoulias
Theodor Zoulias

Reputation: 43474

Here is another implementation of the AsyncEnumerableSource class, that doesn't depend on the Rx library. This one depends instead on the Channel<T>, class, which is natively available in the .NET standard libraries. It has identical behavior to the Rx-based implementation.

The class AsyncEnumerableSource can propagate notifications to multiple subscribers. Each subscriber can enumerate these notifications at its own pace. This is possible because each subscription has its own dedicated Channel<T> as underlying storage. The lifetime of a subscription is practically tied to the lifetime of a single await foreach loop. Breaking early from a loop for any reason (including thrown exceptions), ends immediately the subscription.

In technical terms a new subscription is created the first time that the MoveNextAsync method of an IAsyncEnumerator<T> is invoked. Calling the method GetAsyncEnumerable alone doesn't create a subscription, nor calling the GetAsyncEnumerator method does. The subscription ends when the associated IAsyncEnumerator<T> is disposed.

public class AsyncEnumerableSource<T>
{
    private readonly List<Channel<T>> _channels = new();
    private bool _completed;
    private Exception _exception;

    public async IAsyncEnumerable<T> GetAsyncEnumerable(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        Channel<T> channel;
        lock (_channels)
        {
            if (_exception != null) throw _exception;
            if (_completed) yield break;
            channel = Channel.CreateUnbounded<T>(
                new() { SingleWriter = true, SingleReader = true });
            _channels.Add(channel);
        }
        try
        {
            await foreach (var item in channel.Reader.ReadAllAsync()
                .WithCancellation(cancellationToken).ConfigureAwait(false))
            {
                yield return item;
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
        finally { lock (_channels) _channels.Remove(channel); }
    }

    public void YieldReturn(T value)
    {
        lock (_channels)
        {
            if (_completed) return;
            foreach (var channel in _channels) channel.Writer.TryWrite(value);
        }
    }

    public void Complete()
    {
        lock (_channels)
        {
            if (_completed) return;
            foreach (var channel in _channels) channel.Writer.TryComplete();
            _completed = true;
        }
    }

    public void Fault(Exception error)
    {
        lock (_channels)
        {
            if (_completed) return;
            foreach (var channel in _channels) channel.Writer.TryComplete(error);
            _completed = true;
            _exception = error;
        }
    }
}

The reason for the cancellationToken.ThrowIfCancellationRequested(); is because of this issue: ChannelReader.ReadAllAsync(CancellationToken) not actually cancelled mid-iteration.

Caution: in case you start propagating values with YieldReturn before any consumer has subscribed to the AsyncEnumerableSource, these values are going to be lost. No subscriber is going to observe them. To prevent this scenario you should make sure that all consumers have subscribed before starting the producers. The easiest way to do it is for the consumers to be async methods, with the await foreach being the first await inside the async method:

// Correct, synchronous subscription
async Task Consume()
{
    await foreach (var item in source.GetAsyncEnumerable())
    {
        //...
    }
}
Task consumer = Consume();

Avoid the temptation to use the Task.Run method, because in this case the subscription will occur asynchronously on a ThreadPool thread, and not synchronously with the creation of the consumer:

// Wrong, delayed subscription (possibility for unobserved values)
Task consumer = Task.Run(async () =>
{
    await foreach (var item in source.GetAsyncEnumerable())
    {
        //...
    }
});

In case that you don't want to do the subscriptions synchronously, it is possible to offload them to the ThreadPool, and await them to be established before starting the producers:

// Correct, awaited subscription
Task consumer = await Task.Factory.StartNew(async () =>
{
    HeavySynchronousComputation();
    await foreach (var item in source.GetAsyncEnumerable())
    {
        //...
    }
}, default, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

The Task.Factory.StartNew(async method creates a nested Task<Task>. The outer task represents the subscription, and the inner task represents the consuming loop.

Upvotes: 5

Theodor Zoulias
Theodor Zoulias

Reputation: 43474

AFAIK the .NET platform has no built-in AsyncEnumerableSource class, but it is easy to implement one by using the System.Reactive and System.Linq.Async libraries. The System.Reactive library contains the class Subject which is a combination of an IObservable and IObserver. This is a convenient class, because you can send notifications to the IObserver interface, and subscribe independently any number of times to the IObservable interface to receive these notifications back. Actually it's not required to subscribe manually, because the System.Linq.Async library contains the handy extension method ToAsyncEnumerable, that converts an IObservable to IAsyncEnumerable automatically.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;

public class AsyncEnumerableSource<T>
{
    private readonly Subject<T> _subject = new Subject<T>();

    public IAsyncEnumerable<T> GetAsyncEnumerable() => _subject.ToAsyncEnumerable();
    public void YieldReturn(T value) => _subject.OnNext(value);
    public void Complete() => _subject.OnCompleted();
    public void Fault(Exception ex) => _subject.OnError(ex);
}

This implementation will send to the subscribers only the notifications that occurred after their subscription. If you want to ensure that late joiners will get the early messages, you could replace the Subject with a ReplaySubject. This one buffers the notifications it receives, so it comes with memory usage considerations: it accepts an int bufferSize argument in its constructor.


Note: The above implementation is thread-safe, although the Subject<T> class is not synchronized, and in general calling OnNext from multiple threads in parallel breaks the Rx contract. That's because the ToAsyncEnumerable operator does not depend on the Rx contract for its correctness, and synchronizes the incoming notifications. It's not a particularly efficient implementation though. Channel<T>-based implementations are significantly more efficient under heavy load.

Upvotes: 4

Stephen Cleary
Stephen Cleary

Reputation: 456447

You're much better off if you can structure your code to take advantage of yield return and await foreach. E.g., this code does almost the same thing:

public async Task Consume()
{
    var source = ParentMethod();
    HandlerTask = Task.Run(async () => { await foreach (var item in source) { Console.WriteLine(item); } });
}

public async IAsyncEnumerable<int> ParentMethod()
{
    await Task.Yield();
    yield return 13;
    await foreach (var item in ChildMethod())
        yield return item;
}

private async IAsyncEnumerable<int> ChildMethod()
{
    yield return 5;
    await Task.Yield();
    yield return 10;
}

However, if you really need an "async enumerable source", you need to first recognize one thing. TaskCompletionSource<T> holds the results, i.e., the T (or exception). It's acting as a container. The result can be set before the task is awaited. It's the same thing with the "async enumerable source" - you'd need it to be able to hold results before any items are taken from it. The "async enumerable source" would need to hold multiple results - in this case, a collection.

So what you're actually asking for is "a collection that can be consumed as an asynchronous enumerable". There are a few possibilities here, but the one I'd recommend is a Channel:

public async Task<string> ParentMethod()
{
  var source = Channel.CreateUnbounded<int>();
  var sourceWriter = source.Writer;
  IAsyncEnumerable<int> asyncEnumerable = source.Reader.ReadAllAsync();
  HandlerTask = Task.Run(async () => { await foreach (var item in asyncEnumerable) Console.WriteLine(item); });
  await Task.Yield();
  await sourceWriter.WriteAsync(13);
  var r = await ChildMethod(sourceWriter);
  sourceWriter.Complete();
  return r;
}

private async Task<string> ChildMethod(ChannelWriter<int> sourceWriter)
{
  await sourceWriter.WriteAsync(5);
  await Task.Yield();
  await sourceWriter.WriteAsync(10);
  return "hello";
}

Upvotes: 4

Related Questions