Reputation: 17263
I want to perform an async call based for each event raised by a Reactive Extensions Observable. I'm also trying to keep everything synchronized as I want the async call to finish before the next event is handled.
How would one go about doing something similar to the following? I say similar as the code below does not compile.
settingsChangedInMemory
.Subscribe(async _ => {
var settings = Extract();
await SaveSettings(settings);
});
I'm not sure if it changes anything, but I would need to Subscribe to multiple Observables. For example another subscription like this.
settingsChangedOnDisk
.Subscribe(async _ => {
var settings = await ReadSettings(settings);
Apply(settings);
});
How would you use Reactive Extensions to do this?
Upvotes: 17
Views: 8106
Reputation: 38457
You can use the new ForEachAsync
method released in Reactive Extensions (Rx) 2.0 like so:
await observable
.ForEachAsync(async x =>
{
Console.WriteLine(x);
await Task.Delay(1000);
});
ForEachAsync
returns a Task<T>
which completes when the observable completes. More information in my blog post here or this blog post by the reactive extensions team.
Upvotes: 3
Reputation: 74654
How about:
settingsChangedInMemory
.SelectMany(async _ => await SaveSettings(Extract()))
.Subscribe(x => Apply(x));
Never put an async
in a Subscribe
, you always want to put it in a SelectMany
instead.
Upvotes: 23