Balraj Singh
Balraj Singh

Reputation: 3471

Combining two Rx stream conditionally

I am trying to implement a scenario using Rx where I have two hot Observables. Stream 1 and Stream 2. Based on data of Stream 1 i need to start Stream 2 or Stop Stream 2. Then combine both the stream data into one using CombineLatest. Below id the code that i am able to come up with.

  1. Is there a better way to implement this?

  2. And How can i make it more generic like I will have Stream 1 and then Stream 2 .. n for each stream from 2 .. n there are condition Condition 2 .. n which utilizes data of Stream 1 to check if other stream needs to start or not and then combine all the data in CombineLatest manner

CODE:

        IDisposable TfsDisposable = null;

        // stream 1
        var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));


        // stream 2
        var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)).Publish();


        var observerHot =  hotObs.Do(a => 
        {
            // Based on Condition to start the second stream
            if (ConditionToStartStream2)
            {
                TfsDisposable = TfsDisposable ?? hotObs2.Connect();
            }
        })
        .Do(a => 
        {
            // Based on condition 2 stop the second stream
            if (ConditionToStopStream2)
            {
                TfsDisposable?.Dispose();
                TfsDisposable = null;
            }
        }).Publish();


        // Merge both the stream using Combine Latest
        var finalMergedData  = hotObs.CombineLatest(hotObs2, (a, b) => { return string.Format("{0}, {1}", a, b); });

        // Display the result
        finalMergedData.Subscribe(a => { Console.WriteLine("result: {0}", a);  });

        // Start the first hot observable
        observerHot.Connect();

Upvotes: 2

Views: 1084

Answers (1)

Enigmativity
Enigmativity

Reputation: 117064

Have a play with this:

var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0));
var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(0.3));

var query =
    hotObs2.Publish(h2s =>
        hotObs.Publish(hs =>
            hs
                .Select(a => a % 7 == 0 ? h2s : Observable.Empty<long>())
                .Switch()
                .Merge(hs)));

This takes both observables and publishes them using an overload which publishes them within a lambda. It makes them hot within the scope of the lambda and prevents the need to muck around with managing the calls to .Connect().

Then I'm simply performing a conditional check (in this case is a even) and then returning the other stream and if not returning an empty stream.

Then the.Switch turns the IObservable<IObservable<long>> into a IObservable<long> by only taking vales from the latest inner observable.

Finally it is merging with the original hs stream.

With the example code above I get the following output:

0 
1 
2 
3 
1 
2 
3 
4 
5 
6 
7 
23 
24 
25 
8 

Upvotes: 1

Related Questions