Reputation: 652
I recently got back into programming with RX and came up with a problem combining two sequences.
I've got sequence o1 which looks like this:
var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
.Select(i => i + 2)
.Take(2)
.StartWith(1);
And I have sequence o2 which looks like this:
var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
.Delay(TimeSpan.FromSeconds(1))
.Select(i => i + 2)
.Take(4)
.StartWith(1);
That roughly corresponds to this marble diagram:
o1: 1 - - - - - - 2 - - - - - - 3 -
o2: - a - - b - - c - - d - - e - -
o3: - 1a- - - - - 2c- - - - - - 3e-
I'm just looking for sequence o3 but I can't seem to work it out. Neither Zip
nor CombineLatest
on their own are up to the job.
Upvotes: 1
Views: 909
Reputation: 117029
This works as expected for the two example source observables:
IObservable<long> o1 =
Observable
.Interval(TimeSpan.FromSeconds(7))
.Select(i => i + 2)
.Take(2)
.StartWith(1);
IObservable<char> o2 =
Observable
.Interval(TimeSpan.FromSeconds(3))
.Delay(TimeSpan.FromSeconds(1))
.Select(i => i + 2)
.Take(4)
.StartWith(1)
.Select(x => (char)('a' + x - 1));
IObservable<string> o3 =
from x1 in o1
join x2 in o2
on Observable.Timer(TimeSpan.FromSeconds(2.0))
equals Observable.Timer(TimeSpan.FromSeconds(2.0))
select $"{x1}{x2}";
o3.Subscribe(Console.WriteLine);
The output I get is:
1a
2c
3e
Upvotes: 0
Reputation: 43409
You are probably searching for the WithLatestFrom
operator.
Merges two observable sequences into one observable sequence by combining each element from the first source with the latest element from the second source, if any. Starting from Rx.NET 4.0, this will subscribe to second before subscribing to first, to have a latest element readily available in case first emits an element right away.
In its simplest form it has this signature:
public static IObservable<(TFirst First, TSecond Second)> WithLatestFrom<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second);
The resulting sequence emits elements of type ValueTuple<TFirst,TSecond>
.
Upvotes: 3
Reputation: 19546
You can use a combination of CombineLatest()
, Buffer()
, Where()
and Select()
to build the observable o3
:
CombineLatest()
- Simply combine the two observables o1
and o2
to any data structure you want (I used a Tuple
).Buffer(2,1)
- Build a "sliding window" to see the previous and current Tuple
.Where()
- Filter the "sliding window", that you only get a sliding window where the first element (from o1
) of the previous Tuple
is different to the first element (again from o1
) of the current Tuple
, so you know there was a change, regardless of what o2
is doing in the meantime.Select()
- Simply select the current (or previous) Tuple
.The observable might look like this:
var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
.Select(i => i + 2)
.Take(2)
.StartWith(1)
.Do(it => {
Console.WriteLine("-- o1 triggered: "+it);
});
var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
.Delay(TimeSpan.FromSeconds(1))
.Select(i => i + 2)
.Take(4)
.StartWith(1)
.Do(it => {
Console.WriteLine("-- o2 triggered: "+it);
});
o1.CombineLatest(o2, Tuple.Create)
.StartWith(Tuple.Create(0L, 0L))
.Buffer(2, 1)
.Do(it => {
Console.WriteLine("-- After Buffer: "+String.Join(",",it));
})
.Where(it => {
if (it.Count != 2) {
return false;
}
return it[0].Item1 != it[1].Item1;
})
.Select(it => it[1])
.Subscribe(it => {
Console.WriteLine("Final: "+it);
});
This will generate the following output:
-- o1 triggered: 1
-- o2 triggered: 1
-- After Buffer: (0, 0),(1, 1)
Final: (1, 1)
-- o2 triggered: 2
-- After Buffer: (1, 1),(1, 2)
-- o1 triggered: 2
-- After Buffer: (1, 2),(2, 2)
Final: (2, 2)
-- o2 triggered: 3
-- After Buffer: (2, 2),(2, 3)
-- o2 triggered: 4
-- After Buffer: (2, 3),(2, 4)
-- o2 triggered: 5
-- After Buffer: (2, 4),(2, 5)
-- o1 triggered: 3
-- After Buffer: (2, 5),(3, 5)
Final: (3, 5)
-- After Buffer: (3, 5)
You might need to adjust/add/remove the StartWith()
calls depending on your actual requirement and/or change the Select()
call to get the previous or current Tuple
.
Upvotes: 1