Reputation: 656
In my application I´m getting events from a message bus (rabbit mq, but actually it does not really matter). The processing of this events is taking rather long and the events are produced in bursts. Only one event should be processed at a time. To fulfill this I´m using Rx in order to serialize the events and execute the processing asynchronously in order to not block the producer.
Here is the (simplified) sample code:
static void Main(string[] args)
{
var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
var subscription = input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
.Concat()
.Subscribe();
Console.ReadLine();
subscription.Dispose();
// Wait until finished?
Console.WriteLine("Completed");
}
private static async Task Process(CancellationToken cts, long i)
{
Console.WriteLine($"Processing {i} ...");
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine($"Finished processing {i}");
}
The sample application is disposing the subscription and then the application is terminated. But in this sample, the application is terminating while the last event received before the subscription is still being processed.
Question: What is the best way to wait until the last event is processed after the subscription is disposed? I´m still trying to wrap my head around the async Rx stuff. I assume there is a rather easy way to do this that I´m just not seeing right now.
Upvotes: 1
Views: 859
Reputation: 656
Thanks to the suggestion from Theodor I now found a solution that works for me:
static async Task Main(string[] args)
{
var inputSequence = Observable.Interval(TimeSpan.FromMilliseconds(500));
var terminate = new Subject<Unit>();
var task = Execute(inputSequence, terminate);
Console.ReadLine();
terminate.OnNext(Unit.Default);
await task.ConfigureAwait(false);
Console.WriteLine($"Completed");
Console.ReadLine();
}
private static async Task Process(CancellationToken cts, long i)
{
Console.WriteLine($"Processing {i} ...");
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine($"Finished processing {i}");
}
private static async Task Execute(IObservable<long> input, IObservable<Unit> terminate)
{
await input
.TakeUntil(terminate)
.Select(i => Observable.FromAsync(ct => Process(ct, i)))
.Concat();
}
Upvotes: 1
Reputation: 43525
If you are OK with a quick and easy solution, that is applicable for simple cases like this, then you could just Wait
the IObservable
instead of subscribing to it. And if you want to receive subscription-like notifications, use the Do
operator just before the final Wait
:
static void Main(string[] args)
{
var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
.Concat()
.Do(onNext: x => { }, onError: ex => { }, onCompleted: () => { })
.Wait();
Console.WriteLine("Completed");
}
Upvotes: 2