Reputation:
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
var zip = a.Zip(b, (x, y) => x + "-" + y);
zip.Subscribe(Console.WriteLine);
Prints
0 - 5
1 - 6
2 - 7
...
Instead, I would like to join identical values
5 - 5
6 - 6
7 - 7
8 - 8
...
This is a simplified example of the problem merging 100s of ordered asynchronous sequences. It is very easy to join two IEnumerable's, but I could not find a way to do something like this in Rx. Any ideas?
More about inputs and what I am trying to achieve. Basically, the whole system is a real-time pipeline with multiple state machines (aggregators, buffers, smoothing filters, etc) connected by fork-join pattern. Is RX a good fit for implementing such things? Each input can be represented as
public struct DataPoint
{
public double Value;
public DateTimeOffset Timestamp;
}
Each input bit of data is timestamped on arrival, thus all events are naturally ordered by their joining key (timestamp). As the events travel through the pipeline they get forked and joined. Joins need to be correlated by timestamp and applied in predefined order. For example, join(a,b,c,d) => join(join(join(a,b),c),d).
Edit Below is what I could come up with in a hurry. Hopefully there is a simpler solution based on the existing Rx operators.
static void Test()
{
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
//var zip = a.Zip(b, (x, y) => x + "-" + y);
//zip.Subscribe(Console.WriteLine);
var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
joined.Subscribe(Console.WriteLine);
}
static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();
left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
return Disposable.Empty;
});
Upvotes: 8
Views: 3662
Reputation: 84734
I honestly can't think of a solution based on existing operators that works for hot sources of unknown order (that is, xs before ys
vs ys before xs
). Your solution seems fine (hey, if it works), but I'd make a few changes if it were my code:
MutableDisposable
and CompositeDisposable
OnError
for exceptions thrown from the selector (making it more consistant with other operators)The code below has been tested with your dual-Range input, the same inputs flipped, as well as with Empty<int> + Never<int>
:
public static IObservable<string> MergeJoin(
IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();
bool leftComplete = false;
bool rightComplete = false;
MutableDisposable leftSubscription = new MutableDisposable();
MutableDisposable rightSubscription = new MutableDisposable();
Action tryDequeue = () =>
{
lock (gate)
{
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
string value = null;
try
{
value = selector(a.Dequeue(), b.Dequeue());
}
catch (Exception ex)
{
o.OnError(ex);
return;
}
o.OnNext(value);
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
};
leftSubscription.Disposable = left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);
tryDequeue();
if (rightComplete && b.Count == 0)
{
o.OnCompleted();
}
}
}, () =>
{
leftComplete = true;
if (a.Count == 0 || rightComplete)
{
o.OnCompleted();
}
});
rightSubscription.Disposable = right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);
tryDequeue();
if (rightComplete && b.Count == 0)
{
o.OnCompleted();
}
}
}, () =>
{
rightComplete = true;
if (b.Count == 0 || leftComplete)
{
o.OnCompleted();
}
});
return new CompositeDisposable(leftSubscription, rightSubscription);
});
}
Upvotes: 2
Reputation: 5216
This answer is copied from the Rx forums, just so that it will be archived in here as well:
var xs = Observable.Range(1, 10);
var ys = Observable.Range(5, 10);
var joined = from x in xs
from y in ys
where x == y
select x + "-" + y;
Or without using query expressions:
var joined =
xs.SelectMany(x => ys, (x, y) => new {x, y})
.Where(t => t.x == t.y)
.Select(t => t.x + "-" + t.y);
Upvotes: 2
Reputation: 12700
How about using the new Join operator in v.2838.
var a = Observable.Range(1, 10);
var b = Observable.Range(5, 10);
var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(),
(aOutput, bOutput) => new Tuple<int, int>(aOutput, bOutput))
.Where(tupple => tupple.Item1 == tupple.Item2);
joinedStream.Subscribe(output => Trace.WriteLine(output));
This is my first look at Join
and I'm not sure if it'd be wise to use the Never
operator like this. When dealing with a large volumes of inputs as it'd gernerate a huge amount opertaions the more inputs were revieved. I would think that work could be done to close the windows as matche are made and make the solution more efficient. That said the example above works as per your question.
For the record I think Scott's answer is probably the way to go in this instance. I'm just throwing this in as a potential alternative.
Upvotes: 1
Reputation: 19117
GroupBy
may do what you need. It seems that you have no time constraints on when items get "joined", you just need similar items to be together in some fashion.
Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15))
.GroupBy(k => k)
.Subscribe( go => go.Count().Where(cnt => cnt > 1)
.Subscribe(cnt =>
Console.WriteLine("Key {0} has {1} matches", go.Key, cnt)));
Two things to note about the above, Merge has the following overloads, so that the your req to have hundreds of joined streams won't present an issue:
Merge<TSource>(params IObservable<TSource>[] sources);
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources);
Merge<TSource>(this IObservable<IObservable<TSource>> source);
Furthermore, GroupBy
returns IObservable<IGroupedObservable<TKey, TSource>>
which means that you can react to each group, and each new member of each group as they come in - no need to wait till all complete.
Upvotes: 3