Daniel Little
Daniel Little

Reputation: 17263

Reactive Extensions Subscribe calling await

I want to perform an async call based for each event raised by a Reactive Extensions Observable. I'm also trying to keep everything synchronized as I want the async call to finish before the next event is handled.

How would one go about doing something similar to the following? I say similar as the code below does not compile.

settingsChangedInMemory
    .Subscribe(async _ => {
        var settings = Extract();
        await SaveSettings(settings);
    });

I'm not sure if it changes anything, but I would need to Subscribe to multiple Observables. For example another subscription like this.

settingsChangedOnDisk
    .Subscribe(async _ => {
        var settings = await ReadSettings(settings);
        Apply(settings);
    });

How would you use Reactive Extensions to do this?

Upvotes: 17

Views: 8106

Answers (2)

Muhammad Rehan Saeed
Muhammad Rehan Saeed

Reputation: 38457

You can use the new ForEachAsync method released in Reactive Extensions (Rx) 2.0 like so:

await observable
    .ForEachAsync(async x =>
    {
        Console.WriteLine(x);
        await Task.Delay(1000);
    });

ForEachAsync returns a Task<T> which completes when the observable completes. More information in my blog post here or this blog post by the reactive extensions team.

Upvotes: 3

Ana Betts
Ana Betts

Reputation: 74654

How about:

settingsChangedInMemory
    .SelectMany(async _ => await SaveSettings(Extract()))
    .Subscribe(x => Apply(x));

Never put an async in a Subscribe, you always want to put it in a SelectMany instead.

Upvotes: 23

Related Questions