Reputation: 1841
I've got a use case I've been trying to conquer with regards to combining streams using Rx. I've got 3 streams outputting values:
S1: ----1----2----3----4----5-----6
S2: ----a---------c---------d------
S3: ---------x---------y----------z
I want to combine (zip) streams 1 & 2 and 1 & 3, the logic being that both 2 & 3 can never output a value at the same time.
So the desired output is:
1,a
2,x
3,c
4,y
5,d
6,z
But what is actually happening is:
1,a
1,x
2,c
2,y
3,d
3,z
Any clues as to how I'd get the desired output?
Code:
Subject<int> s1 = new Subject<int>();
Subject<string> s2 = new Subject<string>();
Subject<string> s3 = new Subject<string>();
var zip1 = s1.Zip(s2, (x, y) => {
return new Tuple<int, string>(x, y);
});
var zip2 = s1.Zip(s3, (x, y) => {
return new Tuple<int, string>(x, y);
});
zip1.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
zip2.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
s1.OnNext(1);
s2.OnNext("a");
s1.OnNext(2);
s3.OnNext("x");
s1.OnNext(3);
s2.OnNext("c");
s1.OnNext(4);
s3.OnNext("y");
s1.OnNext(5);
s2.OnNext("d");
s1.OnNext(6);
s3.OnNext("z");
Upvotes: 0
Views: 79
Reputation: 117175
Is this not just a case of doing this?
var query =
s1.Zip(s2.Merge(s3),
(x, y) => new Tuple<int, string>(x, y));
I get this result given your code provided in the question:
Upvotes: 1
Reputation: 203838
For each of the streams that you have, s2 and s3, you can create a Subject
, subscribe to the original stream, and then pass the value onto the new Subject
after awaiting for the next value of s1
:
var zip1 = new Subject<Tuple<int, string>>();
s2.Subscribe(async value =>
{
var other = await s1.FirstAsync();
zip1.OnNext(Tuple.Create(other, value));
});
var zip2 = new Subject<Tuple<int, string>>();
s3.Subscribe(async value =>
{
var other = await s1.FirstAsync();
zip2.OnNext(Tuple.Create(other, value));
});
You've stated that you're going to manage the timings of the streams such that one value is yielded from s1
for each value of the other streams combined. If you do manage that, you'll be fine. If there are multiple values yielded from s1
without any corresponding value in another stream, it'll be ignored, and if multiple values are yielded from the other streams combined without a corresponding value in s1
then the next value yielded out of s1
will be repeated.
There is another option that's a bit more contrived, but that is more flexible in the timings between values. This approach is to first project each stream into something that holds the value but that distinguishes it from the other streams. The streams can then be merged, zipped with s1
, and then multiple streams can be created that filter out the values for each particular stream. These then will need to be projected back to the original values.
var zip = s2.Select(s => Tuple.Create(1, s))
.Merge(s3.Select(s => Tuple.Create(2, s)))
.Zip(s1, Tuple.Create);
var zip1 = zip.Where(pair => pair.Item1.Item1 == 1)
.Select(pair => Tuple.Create(pair.Item2, pair.Item1.Item2));
var zip2 = zip.Where(pair => pair.Item1.Item1 == 2)
.Select(pair => Tuple.Create(pair.Item2, pair.Item1.Item2));
If you don't actually need separate streams, and only want to replicate the output as a single stream, it's actually much simpler:
var output = s1.Zip(s2.Merge(s3), Tuple.Create);
output.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
Upvotes: 1