Reputation: 5510
I have an asnyc
function that I want to invoke on every observation in an IObservable
sequence, limiting delivery to one event at a time. The consumer expects no more than one message in flight; and this is also the RX contract, if I understand it correctly.
Consider this sample:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
The Consume
function fakes a 750 ms processing time, and ob
produces events every 100 ms. The code above works, but calls task.Wait()
on a random thread. If I instead subscribe as in the commented out line 3, then Consume
is invoked at the same rate at which ob
produces events (and I cannot even grok what overload of Subscribe
I am using in this commented statement, so it is probably nonsense).
So how do I correctly deliver one event at a time from an observable sequence to an async
function?
Upvotes: 20
Views: 20485
Reputation: 10783
Subscribers are not supposed to be long running, and therefore there isn't support for executing long running async methods in the Subscribe handlers.
Instead, consider your async method to be a single value observable sequence that takes a value from another sequence. Now you can compose sequences, which is what Rx was designed to do.
Now that you have made that leap, you will probably have something like what @Reijher creates in Howto call back async function from rx subscribe?.
The break down of his code is as follows.
//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();
In this scenario, you are creating implicit queues. In any problem where the producer is faster than the consumer, a queue will need to be used to collect values while waiting. Personally I prefer to make this explicit by putting data into a queue. Alternatively you could explicitly use a Scheduler to signal that is the threading model that should be picking up the slack.
This seems to be a popular hurdle (executing async in a subscribe handler) for Rx newcomers. There are many reasons that the guidance is to not put them in your subscriber, for example: 1. you break the error model 2. you are mixing async models (rx here, task there) 3. subscribe is the consumer of a composition of async sequences. An async method is just a single value sequence, so by that view cant be the end of the sequence, it's result might be though.
UPDATE
To illustrate the comment about breaking the error model here is an update of the OP sample.
void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}
Here we can see that if the OnNext
handler was to throw, then we are not protected by our Rx OnError
handler.
The exception would be unhandled and most likely bring down the application.
Upvotes: 34