Reputation: 375
I am quite new to work with Reactive Extensions, so this might be a newbie question, but i have the following scenario:
I fetch 3 IEnumerable lists (of different types) from the database and populate a view model. I would however like to coordinate the subscriptions to trigger something when all lists have completed loading. Is this possible with Reactive Extensions or am i thinking in the wrong way? Code loks like this:
GetCustomers()
.ToObservable(Scheduler.Default)
.Buffer(20).ObserveOn(SynchronizationContext.Current)
.Subscribe(View.Model.AddRange);
GetCountries()
.ToObservable(Scheduler.Default)
.Buffer(20).ObserveOn(SynchronizationContext.Current)
.Subscribe(View.Model.AddRange);
GetTransports()
.ToObservable(Scheduler.Default)
.Buffer(20).ObserveOn(SynchronizationContext.Current)
.Subscribe(View.Model.AddRange);
Upvotes: 2
Views: 360
Reputation: 12667
I'm not sure why you'd change an already synchronous Enumerable into an Observable, but in Rx idioms, you could:
Observable.Merge(Add(GetCustomers()), Add(GetCountries())..., Add(GetTransports()))
.Subscribe(() => { }, Completed);
where Add could be:
private IObservable<Unit> Add<T>(IObservable<T> o)
{
return o.Buffer(20)
.ObserveOn(SynchronizationContext.Current)
.Do(View.Model.AddRange)
.Select(_ => Unit.Default);
}
Upvotes: 0
Reputation: 117064
You could try using observable joins. Something like this:
var plan =
Observable.Start(() => GetCountries())
.And(Observable.Start(() => GetCustomers()))
.And(Observable.Start(() => GetTransports()))
.Then((countries, customers, transports)
=> new { countries, customers, transports });
var query =
Observable.When(new [] { plan });
query
.Subscribe(cct =>
{
View.Model.AddRange(cct.countries);
View.Model.AddRange(cct.customers);
View.Model.AddRange(cct.transports);
});
It runs in parallel and you get all the results in one at the end.
Upvotes: 3