harri
harri

Reputation: 656

Reactive Extensions Observerable.FromAsync: How to wait until async operation is finished

In my application I´m getting events from a message bus (rabbit mq, but actually it does not really matter). The processing of this events is taking rather long and the events are produced in bursts. Only one event should be processed at a time. To fulfill this I´m using Rx in order to serialize the events and execute the processing asynchronously in order to not block the producer.

Here is the (simplified) sample code:

static void Main(string[] args)
{
   var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
   var subscription = input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
                           .Concat()
                           .Subscribe();

   Console.ReadLine();
   subscription.Dispose();
   // Wait until finished?
   Console.WriteLine("Completed");
}

private static async Task Process(CancellationToken cts, long i)
{
   Console.WriteLine($"Processing {i} ...");
   await Task.Delay(1000).ConfigureAwait(false);
   Console.WriteLine($"Finished processing {i}");
}

The sample application is disposing the subscription and then the application is terminated. But in this sample, the application is terminating while the last event received before the subscription is still being processed.

Question: What is the best way to wait until the last event is processed after the subscription is disposed? I´m still trying to wrap my head around the async Rx stuff. I assume there is a rather easy way to do this that I´m just not seeing right now.

Upvotes: 1

Views: 859

Answers (2)

harri
harri

Reputation: 656

Thanks to the suggestion from Theodor I now found a solution that works for me:

static async Task Main(string[] args)
{
    var inputSequence = Observable.Interval(TimeSpan.FromMilliseconds(500));
    var terminate = new Subject<Unit>();
    var task = Execute(inputSequence, terminate);

    Console.ReadLine();
    terminate.OnNext(Unit.Default);
    await task.ConfigureAwait(false);
    Console.WriteLine($"Completed");
    Console.ReadLine();
}

private static async Task Process(CancellationToken cts, long i)
{
    Console.WriteLine($"Processing {i} ...");
    await Task.Delay(1000).ConfigureAwait(false);
    Console.WriteLine($"Finished processing {i}");
}

private static async Task Execute(IObservable<long> input, IObservable<Unit> terminate)
{
    await input
          .TakeUntil(terminate)
          .Select(i => Observable.FromAsync(ct => Process(ct, i)))
          .Concat();
}

Upvotes: 1

Theodor Zoulias
Theodor Zoulias

Reputation: 43525

If you are OK with a quick and easy solution, that is applicable for simple cases like this, then you could just Wait the IObservable instead of subscribing to it. And if you want to receive subscription-like notifications, use the Do operator just before the final Wait:

static void Main(string[] args)
{
    var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
    input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
        .Concat()
        .Do(onNext: x => { }, onError: ex => { }, onCompleted: () => { })
        .Wait();
    Console.WriteLine("Completed");
}

Upvotes: 2

Related Questions