Aaron
Aaron

Reputation: 652

Combining Two Observable Sequences

I recently got back into programming with RX and came up with a problem combining two sequences.

I've got sequence o1 which looks like this:

var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
                   .Select(i => i + 2)
                   .Take(2)
                   .StartWith(1);

And I have sequence o2 which looks like this:

var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
                   .Delay(TimeSpan.FromSeconds(1))
                   .Select(i => i + 2)
                   .Take(4)
                   .StartWith(1);

That roughly corresponds to this marble diagram:

o1: 1 - - - - - - 2 - - - - - - 3 -
o2: - a - - b - - c - - d - - e - -
o3: - 1a- - - - - 2c- - - - - - 3e-

I'm just looking for sequence o3 but I can't seem to work it out. Neither Zip nor CombineLatest on their own are up to the job.

Upvotes: 1

Views: 909

Answers (3)

Enigmativity
Enigmativity

Reputation: 117029

This works as expected for the two example source observables:

IObservable<long>  o1 =
    Observable
        .Interval(TimeSpan.FromSeconds(7))
        .Select(i => i + 2)
        .Take(2)
        .StartWith(1);

IObservable<char> o2 =
    Observable
        .Interval(TimeSpan.FromSeconds(3))
        .Delay(TimeSpan.FromSeconds(1))
        .Select(i => i + 2)
        .Take(4)
        .StartWith(1)
        .Select(x => (char)('a' + x - 1));

IObservable<string> o3 =
    from x1 in o1
    join x2 in o2
        on Observable.Timer(TimeSpan.FromSeconds(2.0))
        equals Observable.Timer(TimeSpan.FromSeconds(2.0))
    select $"{x1}{x2}";

o3.Subscribe(Console.WriteLine);

The output I get is:

1a
2c
3e

Upvotes: 0

Theodor Zoulias
Theodor Zoulias

Reputation: 43409

You are probably searching for the WithLatestFrom operator.

Merges two observable sequences into one observable sequence by combining each element from the first source with the latest element from the second source, if any. Starting from Rx.NET 4.0, this will subscribe to second before subscribing to first, to have a latest element readily available in case first emits an element right away.

In its simplest form it has this signature:

public static IObservable<(TFirst First, TSecond Second)> WithLatestFrom<TFirst, TSecond>(
    this IObservable<TFirst> first, IObservable<TSecond> second);

The resulting sequence emits elements of type ValueTuple<TFirst,TSecond>.

Upvotes: 3

Progman
Progman

Reputation: 19546

You can use a combination of CombineLatest(), Buffer(), Where() and Select() to build the observable o3:

  • CombineLatest() - Simply combine the two observables o1 and o2 to any data structure you want (I used a Tuple).
  • Buffer(2,1) - Build a "sliding window" to see the previous and current Tuple.
  • Where() - Filter the "sliding window", that you only get a sliding window where the first element (from o1) of the previous Tuple is different to the first element (again from o1) of the current Tuple, so you know there was a change, regardless of what o2 is doing in the meantime.
  • Select() - Simply select the current (or previous) Tuple.

The observable might look like this:

var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
       .Select(i => i + 2)
       .Take(2)
       .StartWith(1)
       .Do(it => {
           Console.WriteLine("-- o1 triggered: "+it);
       });
var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
       .Delay(TimeSpan.FromSeconds(1))
       .Select(i => i + 2)
       .Take(4)
       .StartWith(1)
       .Do(it => {
           Console.WriteLine("-- o2 triggered: "+it);
       });
o1.CombineLatest(o2, Tuple.Create)
    .StartWith(Tuple.Create(0L, 0L))
    .Buffer(2, 1)
    .Do(it => {
        Console.WriteLine("-- After Buffer: "+String.Join(",",it));
    })
    .Where(it => {
        if (it.Count != 2) {
            return false;
        }
        return it[0].Item1 != it[1].Item1;
    })
    .Select(it => it[1])
    .Subscribe(it => {
        Console.WriteLine("Final: "+it);
    });

This will generate the following output:

-- o1 triggered: 1
-- o2 triggered: 1
-- After Buffer: (0, 0),(1, 1)
Final: (1, 1)
-- o2 triggered: 2
-- After Buffer: (1, 1),(1, 2)
-- o1 triggered: 2
-- After Buffer: (1, 2),(2, 2)
Final: (2, 2)
-- o2 triggered: 3
-- After Buffer: (2, 2),(2, 3)
-- o2 triggered: 4
-- After Buffer: (2, 3),(2, 4)
-- o2 triggered: 5
-- After Buffer: (2, 4),(2, 5)
-- o1 triggered: 3
-- After Buffer: (2, 5),(3, 5)
Final: (3, 5)
-- After Buffer: (3, 5)

You might need to adjust/add/remove the StartWith() calls depending on your actual requirement and/or change the Select() call to get the previous or current Tuple.

Upvotes: 1

Related Questions