Reputation: 4464
I am working with a third party library which acts as an interface to a pub-sub message broker. The broker is Solace PubSub+.
For subscribers, the vendor library employs a "push messages via callback" pattern.
I am writing a my own wrapper library around the vendor library to make it easier for other devs to work with (hiding all of the internals of how the library communicates with the network and so on).
In that same vein, I think it might be helpful to to expose a subscriber feed as an IAsyncEnumerable
, and I think this might be a good use case for System.Threading.Channels
. I have two concerns:
EnumerableBroker
wrapper implementation safe, or have I fallen into an async trap somewhere?I realise the first question might be a better fit for CodeReview than SO, but since the answer to that also ties in with the second concern, it seems appropriate to put them together. Worth noting: I am avoiding IObservable
/ Rx, since my goal is to make my interface more basic than the vendor's, not to require that the other devs and myself learn Rx! Understanding how the producer and consumer processes are independent is also trivial with the channel in the middle, whereas with an observable my first mental process is "Ok, so are the producer and the consumer still independent? At first glance it looks like I have to learn about schedulers now... gosh, how about I just use an await foreach
?"
Here's a minimal mockup of consuming messages without the EnumerableBroker
:
// mockup of third party class
private class Broker
{
// mockup of how the third party library pushes messages via callback
public void Subscribe(EventHandler<int> handler) => this.handler = handler;
//simulate the broker pushing messages. Not "real" code
public void Start()
{
Task.Run
(
() =>
{
for (int i = 0; !cts.Token.IsCancellationRequested; i++)
{
// simulate internal latency
Thread.Sleep(10);
handler?.Invoke(this, i);
}
}, cts.Token
);
}
public void Stop() => cts.Cancel();
private CancellationTokenSource cts = new();
private EventHandler<int> handler;
}
private static async Task Main()
{
var broker = new Broker();
broker.Subscribe((_, msg) => Console.WriteLine(msg));
broker.Start();
await Task.Delay(1000);
broker.Stop();
}
And now with a minimal reproduction of the EnumerableBroker
(still using the same mock Broker
class listed above). At least one benefit here seems to be that if the subscriber needs to do a lot of work to process a message, it doesn't tie up the broker's thread - at least until the buffer fills up. This seems to work without error, but I've learned to be wary of my limited grasp of async.
private class EnumerableBroker
{
public EnumerableBroker(int bufferSize = 8)
{
buffer = Channel.CreateBounded<int>
(
new BoundedChannelOptions(bufferSize) { SingleReader = true,
SingleWriter = true }
);
}
public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
broker.Subscribe
(
// switched to sync per Theodor's comments
(_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
);
ct.Register(broker.Stop);
broker.Start();
return buffer.Reader.ReadAllAsync(ct);
}
private readonly Channel<int> buffer;
private readonly Broker broker = new();
}
private static async Task Main()
{
var cts = new CancellationTokenSource();
var broker = new EnumerableBroker();
cts.CancelAfter(1000);
try
{
await foreach (var msg in broker.ReadAsync(cts.Token))
{
Console.WriteLine(msg);
}
}
catch (OperationCanceledException) { }
}
Upvotes: 2
Views: 859
Reputation: 131676
This is overengineered only in the sense that using Channels doesn't need so much code. A typical pattern is to use just methods that accept a ChannelReader
as input and return a ChannelReader
as output, with the method itself creating and owning the output channel. This makes composing stages into a pipeline very easy, especially if those methods are extension methods.
In this case, your code could be rewritten as :
static ChannelReader<int> ToChannel(this Broker broker,
int limit,CancellationToken token=default)
{
var channel=Channel.CreateBounded<int>(limit);
var writer=channel.Writer;
broker.Subscribe((_, args) =>{
writer.TryWrite(args, token);
});
token.Register(()=>writer.Complete());
return channel;
}
This will lose any messages past the limit. If your Broker
understands Task
s, you could use:
broker.Subscribe(async (_, args) =>{
await writer.WriteAsync(args, token);
});
If it doesn't understand tasks, and you can't afford to lose anything, perhaps a better solution would be to use an unbounded channel and handle pause/resume in a later stage. You've already asked a similar question.
Otherwise, you'll have to block the callback:
broker.Subscribe(async (_, args) =>{
writer.WriteAsync(args, token).AsTask().Wait();
});
That's not an ideal solution though.
In both cases, you can consume the data produced by the reader:
var token=cts.Token;
var reader=broker.ToChannel(10,token);
await foreach(var item in reader.ReadAllAsync(token))
{
...
}
Upvotes: 1
Reputation: 43836
Am I overengineering this?
No. A Channel
is exactly the kind of component you need in order to implement this functionality. It's a quite simple mechanism. It's basically an async version of the BlockingCollection<T>
class, with some extra features (like the Completion
property), and a fancy API (the Reader
and Writer
facades).
Is my EnumerableBroker wrapper implementation safe, or have I fallen into an async trap somewhere?
Yes, there is a trap, and you have fallen to it. The SingleWriter = true
configuration means that at most one WriteAsync
operation is allowed to be concurrently in-flight. Before issuing the next WriteAsync
, the previous must be completed. By subscribing to the broker
with an async void
delegate, you are creating essentially a separate writer (producer) for each message pushed by the broker. Most probably the component will complain about this misuse by throwing InvalidOperationException
s or something. The solution is not to switch to SingleWriter = false
though. This will just circumvent the bounded capacity of the Channel
, by creating an external -and highly inefficient- queue with messages that don't fit in the internal queue of the Channel
. The solution is to rethink your buffering strategy. If you can't afford to buffer an unlimited number of messages, you must either drop messages, or throw an exception and kill the consumer. Instead of await buffer.Writer.WriteAsync
, it's better to feed the channel synchronously with bool accepted = buffer.Writer.TryWrite
, and take an appropriate action in case the accepted
is false
.
Another consideration that you should have in mind is that the ChannelReader.ReadAllAsync
method is consuming. This means that if you have multiple readers/consumers of the same channel, each message will be delivered to only one of the consumers. In other words each consumer will receive a partial subset of the channel's messages. You should communicate this to your coworkers, because it's quite trivial to enumerate the same IAsyncEnumerable<T>
more than once. After all an IAsyncEnumerable<T>
is nothing more than a factory of IAsyncEnumerator<T>
s.
Finally, instead of controlling the lifetime of each subscription by a CancellationToken
, you can make your coworkers' lives easier by just terminating a subscription automatically when the enumeration of an IAsyncEnumerator<T>
terminates. When an await foreach
loop ends in any way (like by break
or by an exception), the associated IAsyncEnumerator<T>
is automatically disposed. The C# language has cleverly hooked the DisposeAsync
invocation with the finally
block of the iterator, if a try/finally block wraps the yielding loop. You could take advantage of this great feature like this:
public async IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
broker.Subscribe
(
//...
);
broker.Start();
try
{
await foreach (var msg in buffer.Reader.ReadAllAsync(ct))
{
yield return msg;
}
}
finally
{
broker.Stop();
}
}
Upvotes: 6