Reputation: 83
I want to merge 2 observables and keep the order (probably based on a selector). I also want to put back-pressure on source of the observable.
So the selector would choose one of the items to push on through the observable and the other item would wait for another item to come to compare too.
Src1, Src2, and Result are all of type IObservable<T>
.
Src1: { 1,3,6,8,9,10 }
Src2: { 2,4,5,7,11,12 }
Result: 1,2,3,4,5,6,7,8,9,10,11,12
Timeline:
Src1: -1---3----6------8----9-10
Src2: --2-----4---5-7----11---------12
Result: --1--2--3-4-5-6--7-8--9-10-11-12
Is this possible to achieve with the existing .net Rx methods?
EDIT: Note the 2 source observables are guaranteed to be in order.
Example Test:
var source1 = new List<int>() { 1, 4, 6, 7, 8, 10, 14 }.AsEnumerable();
var source2 = new List<int>() { 2, 3, 5, 9, 11, 12, 13, 15 }.AsEnumerable();
var src1 = source1.ToObservable();
var src2 = source2.ToObservable();
var res = src1.SortedMerge(src2, (a, b) =>
{
if (a <= b)
return a;
else
return b;
});
res.Subscribe((x) => Console.Write($"{x}, "));
DesiredResult: 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
Upvotes: 3
Views: 237
Reputation: 14370
This was fun to do. Had to tweak the algorithm a bit. It could be further improved.
Assumptions:
streamA
, streamB
of the common type T
.streamA[i] < streamA[i+1]
and streamB[i] < stream[i+1]
.streamA[i]
and streamB[i]
.NotImplementedException
. This case is easily handled, but I wanted to avoid ambiguity.min
for type T
. Here's the algorithm I used:
qA
and qB
.streamA
, enqueue it into qA
.streamB
, enqueue it into qB
. qA
and qB
, compare the top items of the two queues. Remove and emit the min of those two. If both queues are still non-empty, repeat.streamA
or streamB
completes, dump the contents of the queues and terminate. Note: This is admittedly lazy, and should probably change to dump, then continue returning the non-completed observable.Here's the code:
public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other)
{
return SortedMerge(source, other, (a, b) => Enumerable.Min(new[] { a, b}));
}
public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other, Func<T, T, T> min)
{
return source
.Select(i => (key: 1, value: i)).Materialize()
.Merge(other.Select(i => (key: 2, value: i)).Materialize())
.Scan((qA: ImmutableQueue<T>.Empty, qB: ImmutableQueue<T>.Empty, exception: (Exception)null, outputMessages: new List<T>()),
(state, message) =>
{
if (message.Kind == NotificationKind.OnNext)
{
var key = message.Value.key;
var value = message.Value.value;
var qA = state.qA;
var qB = state.qB;
if (key == 1)
qA = qA.Enqueue(value);
else
qB = qB.Enqueue(value);
var output = new List<T>();
while(!qA.IsEmpty && !qB.IsEmpty)
{
var aVal = qA.Peek();
var bVal = qB.Peek();
var minVal = min(aVal, bVal);
if(aVal.Equals(minVal) && bVal.Equals(minVal))
throw new NotImplementedException();
if(aVal.Equals(minVal))
{
output.Add(aVal);
qA = qA.Dequeue();
}
else
{
output.Add(bVal);
qB = qB.Dequeue();
}
}
return (qA, qB, null, output);
}
else if (message.Kind == NotificationKind.OnError)
{
return (state.qA, state.qB, message.Exception, new List<T>());
}
else //message.Kind == NotificationKind.OnCompleted
{
var output = state.qA.Concat(state.qB).ToList();
return (ImmutableQueue<T>.Empty, ImmutableQueue<T>.Empty, null, output);
}
})
.Publish(tuples => Observable.Merge(
tuples
.Where(t => t.outputMessages.Any() && (!t.qA.IsEmpty || !t.qB.IsEmpty))
.SelectMany(t => t.outputMessages
.Select(v => Notification.CreateOnNext<T>(v))
.ToObservable()
),
tuples
.Where(t => t.outputMessages.Any() && t.qA.IsEmpty && t.qB.IsEmpty)
.SelectMany(t => t.outputMessages
.Select(v => Notification.CreateOnNext<T>(v))
.ToObservable()
.Concat(Observable.Return(Notification.CreateOnCompleted<T>()))
),
tuples
.Where(t => t.exception != null)
.Select(t => Notification.CreateOnError<T>(t.exception))
))
.Dematerialize();
ImmutableQueue
comes from System.Collections.Immutable
. Scan
is required to keep track of state. Materialization is required because of the OnCompleted
handling. This is admittedly a complicated solution, but I'm not sure there's a cleaner Rx-centric way.
Let me know if there's something you want more clarification about.
Upvotes: 1