msauce4
msauce4

Reputation: 83

Preserve Sorting While Merging Two Observables

I want to merge 2 observables and keep the order (probably based on a selector). I also want to put back-pressure on source of the observable.

So the selector would choose one of the items to push on through the observable and the other item would wait for another item to come to compare too.

Src1, Src2, and Result are all of type IObservable<T>.

Src1: { 1,3,6,8,9,10 }
Src2: { 2,4,5,7,11,12 }
Result: 1,2,3,4,5,6,7,8,9,10,11,12

Timeline:
Src1:    -1---3----6------8----9-10
Src2:    --2-----4---5-7----11---------12
Result:  --1--2--3-4-5-6--7-8--9-10-11-12
  1. In the example above, src1 emits '1' and is blocked until src2 emits it's first item, '2'.
  2. A selector, which selects the smallest item, is applied which selects the item from src1.
  3. Src2 now waits for the next item (from src1) to compare with its current item ('2').
  4. When src1 emits the next item, '3', the selection is ran again, this time selecting the item from src2.
  5. This repeats until one of the observables completes. Then, the remaining observable pushes items until it completes.

Is this possible to achieve with the existing .net Rx methods?

EDIT: Note the 2 source observables are guaranteed to be in order.

Example Test:

var source1 = new List<int>() { 1, 4, 6, 7, 8, 10, 14 }.AsEnumerable();
var source2 = new List<int>() { 2, 3, 5, 9, 11, 12, 13, 15 }.AsEnumerable();

var src1 = source1.ToObservable();
var src2 = source2.ToObservable();

var res = src1.SortedMerge(src2, (a, b) =>
    {
       if (a <= b)
           return a;
       else
           return b;
    });

res.Subscribe((x) => Console.Write($"{x}, "));

DesiredResult: 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15

Upvotes: 3

Views: 237

Answers (1)

Shlomo
Shlomo

Reputation: 14370

This was fun to do. Had to tweak the algorithm a bit. It could be further improved.

Assumptions:

  1. There are two streams, streamA, streamB of the common type T.
  2. The two streams are separately sorted such that streamA[i] < streamA[i+1] and streamB[i] < stream[i+1].
  3. You cannot assume any relationship between streamA[i] and streamB[i].
  4. Streams A & B are discreet: The same element won't emit from both. If this happens, I'm throwing NotImplementedException. This case is easily handled, but I wanted to avoid ambiguity.
  5. There is a function min for type T.
  6. No assumptions are made regarding relative velocity of the two streams, but if one is consistently faster than the other, backpressure will be a problem.

Here's the algorithm I used:

  • Let there be two queues, qA and qB.
  • When you get an item from streamA, enqueue it into qA.
  • When you get an item from streamB, enqueue it into qB.
  • While there is an item in both qA and qB, compare the top items of the two queues. Remove and emit the min of those two. If both queues are still non-empty, repeat.
  • If either streamA or streamB completes, dump the contents of the queues and terminate. Note: This is admittedly lazy, and should probably change to dump, then continue returning the non-completed observable.

Here's the code:

public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other)
{
    return SortedMerge(source, other, (a, b) => Enumerable.Min(new[] { a, b}));
}

public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other, Func<T, T, T> min)
{
    return source
        .Select(i => (key: 1, value: i)).Materialize()
        .Merge(other.Select(i => (key: 2, value: i)).Materialize())
        .Scan((qA: ImmutableQueue<T>.Empty, qB: ImmutableQueue<T>.Empty, exception: (Exception)null, outputMessages: new List<T>()), 
            (state, message) =>
        {
            if (message.Kind == NotificationKind.OnNext)
            {
                var key = message.Value.key;
                var value = message.Value.value;
                var qA = state.qA;
                var qB = state.qB;
                if (key == 1)
                    qA = qA.Enqueue(value);
                else
                    qB = qB.Enqueue(value);
                var output = new List<T>();
                while(!qA.IsEmpty && !qB.IsEmpty)
                {
                    var aVal = qA.Peek();
                    var bVal = qB.Peek();
                    var minVal = min(aVal, bVal);
                    if(aVal.Equals(minVal) && bVal.Equals(minVal))
                        throw new NotImplementedException();

                    if(aVal.Equals(minVal))
                    {
                        output.Add(aVal);
                        qA = qA.Dequeue();
                    }
                    else
                    {
                        output.Add(bVal);
                        qB = qB.Dequeue();
                    }
                }
                return (qA, qB, null, output);
            }
            else if (message.Kind == NotificationKind.OnError)
            {
                return (state.qA, state.qB, message.Exception, new List<T>());
            }
            else //message.Kind == NotificationKind.OnCompleted
            {
                var output = state.qA.Concat(state.qB).ToList();
                return (ImmutableQueue<T>.Empty, ImmutableQueue<T>.Empty, null, output);
            }
        })
        .Publish(tuples => Observable.Merge(
            tuples
                .Where(t => t.outputMessages.Any() && (!t.qA.IsEmpty || !t.qB.IsEmpty))
                .SelectMany(t => t.outputMessages
                    .Select(v => Notification.CreateOnNext<T>(v))
                    .ToObservable()
            ),
            tuples
                .Where(t => t.outputMessages.Any() && t.qA.IsEmpty && t.qB.IsEmpty)
                .SelectMany(t => t.outputMessages
                    .Select(v => Notification.CreateOnNext<T>(v))
                    .ToObservable()
                    .Concat(Observable.Return(Notification.CreateOnCompleted<T>()))
            ),
            tuples
                .Where(t => t.exception != null)
                .Select(t => Notification.CreateOnError<T>(t.exception))
        ))
        .Dematerialize();

ImmutableQueue comes from System.Collections.Immutable. Scan is required to keep track of state. Materialization is required because of the OnCompleted handling. This is admittedly a complicated solution, but I'm not sure there's a cleaner Rx-centric way.

Let me know if there's something you want more clarification about.

Upvotes: 1

Related Questions