blaster
blaster

Reputation: 8937

Can I Pair Two Sequences Together by a Matching Key?

Let's say sequence one is going out to the web to retrieve the contents of sites 1, 2, 3, 4, 5 (but will return in unpredictable order).

Sequence two is going to a database to retrieve context about these same records 1, 2, 3, 4, 5 (but for the purposes of this example will return in unpredictable order).

Is there an Rx extension method that will combine these into one sequence when each matching pair is ready in both sequences? Ie, if the first sequence returns in the order 4,2,3,5,1 and the second sequence returns in the order 1,4,3,2,5, the merged sequence would be (4,4), (3,3), (2,2), (1,1), (5,5) - as soon as each pair is ready. I've looked at Merge and Zip but they don't seem to be exactly what I'm looking for.

I wouldn't want to discard pairs that don't match, which I think rules out a simple .Where.Select combination.

Upvotes: 5

Views: 269

Answers (2)

Matthew Finlay
Matthew Finlay

Reputation: 3464

var paired = Observable
    .Merge(aSource, bSource)
    .GroupBy(i => i)
    .SelectMany(g => g.Buffer(2).Take(1));

The test below gives the correct results. It's just taking ints at the moment, if you're using data with keys and values, then you'll need to group by i.Key instead of i.

var aSource = new Subject<int>();
var bSource = new Subject<int>();

paired.Subscribe(g => Console.WriteLine("{0}:{1}", g.ElementAt(0), g.ElementAt(1)));

aSource.OnNext(4);
bSource.OnNext(1);
aSource.OnNext(2);
bSource.OnNext(4);
aSource.OnNext(3);
bSource.OnNext(3);
aSource.OnNext(5);
bSource.OnNext(2);
aSource.OnNext(1);
bSource.OnNext(5);

yields:

4:4
3:3
2:2
1:1
5:5

Edit in response to Brandon:

For the situation where the items are different classes (AClass and BClass), the following adjustment can be made.

using Pair = Tuple<AClass, BClass>;

var paired = Observable
    .Merge(aSource.Select(a => new Pair(a, null)), bSource.Select(b => new Pair(null, b)))
    .GroupBy(p => p.Item1 != null ? p.Item1.Key : p.Item2.Key)
    .SelectMany(g => g.Buffer(2).Take(1))
    .Select(g => new Pair(
      g.ElementAt(0).Item1 ?? g.ElementAt(1).Item1, 
      g.ElementAt(0).Item2 ?? g.ElementAt(1).Item2));

Upvotes: 3

Brandon
Brandon

Reputation: 39182

So you have 2 observable sequences that you want to pair together?

Pair from Rxx along with GroupBy can help here. I think code similar to the following might do what you want

var pairs = stream1.Pair(stream2)
       .GroupBy(pair => pair.Switch(source1 => source1.Key, source2 => source2.Key))
       .SelectMany(group => group.Take(2).ToArray()) // each group will have at most 2 results (1 left and 1 right)
       .Select(pair =>
       {
           T1 result1 = default(T1);
           T2 result2 = default(T2);

           foreach (var r in pair)
           {
              if (r.IsLeft) result1 = r.Left;
              else result2 = r.Right;
           }

           return new { result1, result2 };
       });

```

I've not tested it, and not added in anything for error handling, but I think this is what you want.

Upvotes: 1

Related Questions