carl
carl

Reputation: 375

Synchronizing multiple observable streams

I am quite new to work with Reactive Extensions, so this might be a newbie question, but i have the following scenario:

I fetch 3 IEnumerable lists (of different types) from the database and populate a view model. I would however like to coordinate the subscriptions to trigger something when all lists have completed loading. Is this possible with Reactive Extensions or am i thinking in the wrong way? Code loks like this:

GetCustomers()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);
GetCountries()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);
GetTransports()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);

Upvotes: 2

Views: 360

Answers (2)

Asti
Asti

Reputation: 12667

I'm not sure why you'd change an already synchronous Enumerable into an Observable, but in Rx idioms, you could:

Observable.Merge(Add(GetCustomers()), Add(GetCountries())..., Add(GetTransports()))
           .Subscribe(() => { }, Completed);

where Add could be:

    private IObservable<Unit> Add<T>(IObservable<T> o)
    {
        return o.Buffer(20)
                .ObserveOn(SynchronizationContext.Current)
                .Do(View.Model.AddRange)
                .Select(_ => Unit.Default);
    }

Upvotes: 0

Enigmativity
Enigmativity

Reputation: 117064

You could try using observable joins. Something like this:

var plan =
    Observable.Start(() => GetCountries())
        .And(Observable.Start(() => GetCustomers()))
        .And(Observable.Start(() => GetTransports()))
        .Then((countries, customers, transports)
            => new { countries, customers, transports });

var query =
    Observable.When(new [] { plan });

query
    .Subscribe(cct =>
    {
        View.Model.AddRange(cct.countries);
        View.Model.AddRange(cct.customers);
        View.Model.AddRange(cct.transports);
    });

It runs in parallel and you get all the results in one at the end.

Upvotes: 3

Related Questions