David
David

Reputation: 1841

Combine overlapping observable streams but take most recent values

I've got a use case I've been trying to conquer with regards to combining streams using Rx. I've got 3 streams outputting values:

S1: ----1----2----3----4----5-----6
S2: ----a---------c---------d------
S3: ---------x---------y----------z

I want to combine (zip) streams 1 & 2 and 1 & 3, the logic being that both 2 & 3 can never output a value at the same time.

So the desired output is:

1,a
2,x
3,c
4,y
5,d
6,z

But what is actually happening is:

1,a
1,x
2,c
2,y
3,d
3,z

Any clues as to how I'd get the desired output?

Code:

Subject<int> s1 = new Subject<int>();
Subject<string> s2 = new Subject<string>();
Subject<string> s3 = new Subject<string>();

var zip1 = s1.Zip(s2, (x, y) => {
    return new Tuple<int, string>(x, y);
});

var zip2 = s1.Zip(s3, (x, y) => {
    return new Tuple<int, string>(x, y);
});

zip1.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
zip2.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));

s1.OnNext(1);
s2.OnNext("a");

s1.OnNext(2);
s3.OnNext("x");

s1.OnNext(3);
s2.OnNext("c");

s1.OnNext(4);
s3.OnNext("y");

s1.OnNext(5);
s2.OnNext("d");

s1.OnNext(6);
s3.OnNext("z");

Upvotes: 0

Views: 79

Answers (2)

Enigmativity
Enigmativity

Reputation: 117175

Is this not just a case of doing this?

var query =
    s1.Zip(s2.Merge(s3),
        (x, y) => new Tuple<int, string>(x, y));

I get this result given your code provided in the question:

results

Upvotes: 1

Servy
Servy

Reputation: 203838

For each of the streams that you have, s2 and s3, you can create a Subject, subscribe to the original stream, and then pass the value onto the new Subject after awaiting for the next value of s1:

var zip1 = new Subject<Tuple<int, string>>();
s2.Subscribe(async value =>
{
    var other = await s1.FirstAsync();
    zip1.OnNext(Tuple.Create(other, value));
});

var zip2 = new Subject<Tuple<int, string>>();
s3.Subscribe(async value =>
{
    var other = await s1.FirstAsync();
    zip2.OnNext(Tuple.Create(other, value));
});

You've stated that you're going to manage the timings of the streams such that one value is yielded from s1 for each value of the other streams combined. If you do manage that, you'll be fine. If there are multiple values yielded from s1 without any corresponding value in another stream, it'll be ignored, and if multiple values are yielded from the other streams combined without a corresponding value in s1 then the next value yielded out of s1 will be repeated.

There is another option that's a bit more contrived, but that is more flexible in the timings between values. This approach is to first project each stream into something that holds the value but that distinguishes it from the other streams. The streams can then be merged, zipped with s1, and then multiple streams can be created that filter out the values for each particular stream. These then will need to be projected back to the original values.

var zip = s2.Select(s => Tuple.Create(1, s))
    .Merge(s3.Select(s => Tuple.Create(2, s)))
    .Zip(s1, Tuple.Create);

var zip1 = zip.Where(pair => pair.Item1.Item1 == 1)
    .Select(pair => Tuple.Create(pair.Item2, pair.Item1.Item2));
var zip2 = zip.Where(pair => pair.Item1.Item1 == 2)
    .Select(pair => Tuple.Create(pair.Item2, pair.Item1.Item2));

If you don't actually need separate streams, and only want to replicate the output as a single stream, it's actually much simpler:

var output = s1.Zip(s2.Merge(s3), Tuple.Create);
output.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));

Upvotes: 1

Related Questions