Reputation: 3471
I am trying to implement a scenario using Rx where I have two hot Observables. Stream 1 and Stream 2. Based on data of Stream 1 i need to start Stream 2 or Stop Stream 2. Then combine both the stream data into one using CombineLatest. Below id the code that i am able to come up with.
Is there a better way to implement this?
And How can i make it more generic like I will have Stream 1 and then Stream 2 .. n for each stream from 2 .. n there are condition Condition 2 .. n which utilizes data of Stream 1 to check if other stream needs to start or not and then combine all the data in CombineLatest manner
CODE:
IDisposable TfsDisposable = null;
// stream 1
var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
// stream 2
var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)).Publish();
var observerHot = hotObs.Do(a =>
{
// Based on Condition to start the second stream
if (ConditionToStartStream2)
{
TfsDisposable = TfsDisposable ?? hotObs2.Connect();
}
})
.Do(a =>
{
// Based on condition 2 stop the second stream
if (ConditionToStopStream2)
{
TfsDisposable?.Dispose();
TfsDisposable = null;
}
}).Publish();
// Merge both the stream using Combine Latest
var finalMergedData = hotObs.CombineLatest(hotObs2, (a, b) => { return string.Format("{0}, {1}", a, b); });
// Display the result
finalMergedData.Subscribe(a => { Console.WriteLine("result: {0}", a); });
// Start the first hot observable
observerHot.Connect();
Upvotes: 2
Views: 1084
Reputation: 117064
Have a play with this:
var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0));
var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(0.3));
var query =
hotObs2.Publish(h2s =>
hotObs.Publish(hs =>
hs
.Select(a => a % 7 == 0 ? h2s : Observable.Empty<long>())
.Switch()
.Merge(hs)));
This takes both observables and publishes them using an overload which publishes them within a lambda. It makes them hot within the scope of the lambda and prevents the need to muck around with managing the calls to .Connect()
.
Then I'm simply performing a conditional check (in this case is a
even) and then returning the other stream and if not returning an empty stream.
Then the.Switch
turns the IObservable<IObservable<long>>
into a IObservable<long>
by only taking vales from the latest inner observable.
Finally it is merging with the original hs
stream.
With the example code above I get the following output:
0 1 2 3 1 2 3 4 5 6 7 23 24 25 8
Upvotes: 1