allmhuran
allmhuran

Reputation: 4464

Exposing a push via callback message broker as an IAsyncEnumerable

I am working with a third party library which acts as an interface to a pub-sub message broker. The broker is Solace PubSub+.

For subscribers, the vendor library employs a "push messages via callback" pattern.

I am writing a my own wrapper library around the vendor library to make it easier for other devs to work with (hiding all of the internals of how the library communicates with the network and so on).

In that same vein, I think it might be helpful to to expose a subscriber feed as an IAsyncEnumerable, and I think this might be a good use case for System.Threading.Channels. I have two concerns:

  1. Are channels appropriate here, or am I overengineering this? Ie, Is there a more "C# idiomatic" way to wrap callbacks?
  2. Is my EnumerableBroker wrapper implementation safe, or have I fallen into an async trap somewhere?

I realise the first question might be a better fit for CodeReview than SO, but since the answer to that also ties in with the second concern, it seems appropriate to put them together. Worth noting: I am avoiding IObservable / Rx, since my goal is to make my interface more basic than the vendor's, not to require that the other devs and myself learn Rx! Understanding how the producer and consumer processes are independent is also trivial with the channel in the middle, whereas with an observable my first mental process is "Ok, so are the producer and the consumer still independent? At first glance it looks like I have to learn about schedulers now... gosh, how about I just use an await foreach?"

Here's a minimal mockup of consuming messages without the EnumerableBroker:

// mockup of third party class
private class Broker
{
    // mockup of how the third party library pushes messages via callback
    public void Subscribe(EventHandler<int> handler) => this.handler = handler;

    //simulate the broker pushing messages. Not "real" code
    public void Start()
    {
        Task.Run
        (
            () =>
            {
                for (int i = 0; !cts.Token.IsCancellationRequested; i++)
                {
                    // simulate internal latency
                    Thread.Sleep(10);
                    handler?.Invoke(this, i);
                }
            }, cts.Token
        );
    }

    public void Stop() => cts.Cancel();

    private CancellationTokenSource cts = new();
    private EventHandler<int> handler;
}

private static async Task Main()
{
    var broker = new Broker();
    broker.Subscribe((_, msg) => Console.WriteLine(msg));
    broker.Start();
    await Task.Delay(1000);
    broker.Stop();
}

And now with a minimal reproduction of the EnumerableBroker (still using the same mock Broker class listed above). At least one benefit here seems to be that if the subscriber needs to do a lot of work to process a message, it doesn't tie up the broker's thread - at least until the buffer fills up. This seems to work without error, but I've learned to be wary of my limited grasp of async.

private class EnumerableBroker
{
    public EnumerableBroker(int bufferSize = 8)
    {
        buffer = Channel.CreateBounded<int>
        (
            new BoundedChannelOptions(bufferSize) { SingleReader = true,
                SingleWriter = true }
        );
    }

    public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
    {
        broker.Subscribe
        (
            // switched to sync per Theodor's comments
            (_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
        );
        ct.Register(broker.Stop);
        broker.Start();
        return buffer.Reader.ReadAllAsync(ct);
    }

    private readonly Channel<int> buffer;
    private readonly Broker broker = new();
}

private static async Task Main()
{
    var cts = new CancellationTokenSource();
    var broker = new EnumerableBroker();
    cts.CancelAfter(1000);
    try
    {
        await foreach (var msg in broker.ReadAsync(cts.Token))
        {
            Console.WriteLine(msg);
        }
    }
    catch (OperationCanceledException) { }
}

Upvotes: 2

Views: 859

Answers (2)

Panagiotis Kanavos
Panagiotis Kanavos

Reputation: 131676

This is overengineered only in the sense that using Channels doesn't need so much code. A typical pattern is to use just methods that accept a ChannelReader as input and return a ChannelReader as output, with the method itself creating and owning the output channel. This makes composing stages into a pipeline very easy, especially if those methods are extension methods.

In this case, your code could be rewritten as :

static ChannelReader<int> ToChannel(this Broker broker, 
    int limit,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<int>(limit);
    var writer=channel.Writer;

    broker.Subscribe((_, args) =>{
        writer.TryWrite(args, token);
    });
    token.Register(()=>writer.Complete());

    return channel;
}

This will lose any messages past the limit. If your Broker understands Tasks, you could use:

broker.Subscribe(async (_, args) =>{
        await writer.WriteAsync(args, token);
    });

If it doesn't understand tasks, and you can't afford to lose anything, perhaps a better solution would be to use an unbounded channel and handle pause/resume in a later stage. You've already asked a similar question.

Otherwise, you'll have to block the callback:

broker.Subscribe(async (_, args) =>{
       writer.WriteAsync(args, token).AsTask().Wait();
    });

That's not an ideal solution though.

In both cases, you can consume the data produced by the reader:

var token=cts.Token;
var reader=broker.ToChannel(10,token);

await foreach(var item in reader.ReadAllAsync(token))
{
...
}

Upvotes: 1

Theodor Zoulias
Theodor Zoulias

Reputation: 43836

Am I overengineering this?

No. A Channel is exactly the kind of component you need in order to implement this functionality. It's a quite simple mechanism. It's basically an async version of the BlockingCollection<T> class, with some extra features (like the Completion property), and a fancy API (the Reader and Writer facades).

Is my EnumerableBroker wrapper implementation safe, or have I fallen into an async trap somewhere?

Yes, there is a trap, and you have fallen to it. The SingleWriter = true configuration means that at most one WriteAsync operation is allowed to be concurrently in-flight. Before issuing the next WriteAsync, the previous must be completed. By subscribing to the broker with an async void delegate, you are creating essentially a separate writer (producer) for each message pushed by the broker. Most probably the component will complain about this misuse by throwing InvalidOperationExceptions or something. The solution is not to switch to SingleWriter = false though. This will just circumvent the bounded capacity of the Channel, by creating an external -and highly inefficient- queue with messages that don't fit in the internal queue of the Channel. The solution is to rethink your buffering strategy. If you can't afford to buffer an unlimited number of messages, you must either drop messages, or throw an exception and kill the consumer. Instead of await buffer.Writer.WriteAsync, it's better to feed the channel synchronously with bool accepted = buffer.Writer.TryWrite, and take an appropriate action in case the accepted is false.

Another consideration that you should have in mind is that the ChannelReader.ReadAllAsync method is consuming. This means that if you have multiple readers/consumers of the same channel, each message will be delivered to only one of the consumers. In other words each consumer will receive a partial subset of the channel's messages. You should communicate this to your coworkers, because it's quite trivial to enumerate the same IAsyncEnumerable<T> more than once. After all an IAsyncEnumerable<T> is nothing more than a factory of IAsyncEnumerator<T>s.

Finally, instead of controlling the lifetime of each subscription by a CancellationToken, you can make your coworkers' lives easier by just terminating a subscription automatically when the enumeration of an IAsyncEnumerator<T> terminates. When an await foreach loop ends in any way (like by break or by an exception), the associated IAsyncEnumerator<T> is automatically disposed. The C# language has cleverly hooked the DisposeAsync invocation with the finally block of the iterator, if a try/finally block wraps the yielding loop. You could take advantage of this great feature like this:

public async IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
    broker.Subscribe
    (
        //...
    );
    broker.Start();
    try
    {
        await foreach (var msg in buffer.Reader.ReadAllAsync(ct))
        {
            yield return msg;
        }
    }
    finally
    {
        broker.Stop();
    }
}

Upvotes: 6

Related Questions