Reputation: 63
We're implementing some software components using C# and Reactive Extensions. It contains functionality that splits an observable using the GroupBy
method, than performs some arithmetics on those split observables on afterwards, merges the observables back together with the Merge()
method.
All goes well if the maxConcurrent
parameter is not used. Because If this parameter is used, it seems data is 'lost'.
Tried searching for this issue. Tried to incorporate Observable.Start
and Observable.Defer
but no results. Created a real small test application to show of the problem.
var sourceObservable = Enumerable.Range(0, 10)
.Select(x => new { Index = x, Remainder = x % 3 }).ToObservable();
var ungrouped = sourceObservable.Select(x => x.Index);
var limitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
.Select(group => group.Select(x => x.Index)).Merge(maxConcurrent: 2);
var unlimitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
.Select(group => group.Select(x => x.Index)).Merge();
Console.WriteLine($"ungrouped: {string.Join(",", await ungrouped.ToList())}");
Console.WriteLine($"limited: {string.Join(",", await limitedGrouping.ToList())}");
Console.WriteLine($"unlimited: {string.Join(",", await unlimitedGrouping.ToList())}");
Expected that in this case, 'limitedGrouping' content would be identical to 'unlimitedGrouping' content. However it is not:
ungrouped: 0,1,2,3,4,5,6,7,8,9
limited: 0,1,3,4,6,7,9
unlimited: 0,1,2,3,4,5,6,7,8,9
The limited one is missing data numbers 2, 5, and 8. What mistake are we making here?
Upvotes: 4
Views: 644
Reputation: 43474
As Shlomo explained in their answer, the grouped subsequences emitted by the GroupBy
are hot, meaning that they start emitting values whether they are subscribed or not. So if you don't subscribe to them immediately after their creation, you risk losing some of their values. Actually it is guaranteed that you'll lose at least one value, because each subsequence is created with an already known first value, which is emitted synchronously, immediately after all synchronous subscriptions may have occurred.
On the other hand the Merge
operator with the maxConcurrent
parameter achieves the concurrency limitation by postponing the subscription to some of the emitted subsequences. So combining GroupBy
and Merge
makes it very easy to lose values. Here is what happens in your example:
Source: +---0---1---2---3---4---5---6---7---8---9---|
Mod-0: +0----------3-----------6-----------9---|
Mod-1: +1----------4-----------7-----------|
Mod-2: +2----------5-----------8-------|
Merge(2): +----0---1------3---4-------6---7-------9---|
The Mod-2 subsequence was subscribed when the Mod-0 completed, at which point it had already emitted all of its values.
The only solution to this problem I can think of is to make all grouped subsequences replayable, by using the Replay
operator. This operator returns an IConnectableObservable<T>
to which you should connect right away, so instead of the usual RefCount
you should use the AutoConnect(0)
operator:
var limitedGrouping = sourceObservable
.GroupBy(x => x.Remainder)
.Select(group => group.Replay().AutoConnect(0).Select(x => x.Index))
.Merge(maxConcurrent: 2);
Of course this solution has the downside of increasing the memory usage. Depending on the situation, this may be anything from OK to unacceptable.
Upvotes: 0
Reputation: 14350
It looks like intended-but-confusing functionality in GroupBy
. This code is equivalent, and fails similarly:
var source = Observable.Range(0, 10);
source
.GroupBy(i => i % 3)
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9
This code is similar, but it succeeds:
var a = source.Where(i => i % 3 == 0);
var b = source.Where(i => i % 3 == 1);
var c = source.Where(i => i % 3 == 2);
var l = new List<IObservable<int>>() { a, b, c };
l.ToObservable()
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9 2 5 8
Somewhat more trippy is this:
source
.GroupBy(i => i % 3)
.Concat() //or .Merge(1), those are roughly equivalent.
.Subscribe(Console.WriteLine); //Outputs 0 3 6 9
When I first looked at this, I expected for all the Merge(2)
cases to be 0 1 3 4 6 7 9 2 5 8
. I expected Concat
, which is basically Merge(1)
to be 0 3 6 9 1 4 7 2 5 8
.
maxConcurrent(n)
says that only n
observables should be subscribed to at once. If it receives more than n
observables, then it queues the extra observables, subscribing later when old observables end.
In our case, it receives three observables (mod-0, mod-1, and mod-2) in that order. It subscribes to the first two, then queues the mod-2 observable, only subscribing when mod-0 or mod-1 are done. However, when mod-0/mod-1 observables are done, the mod-2 observable is apparently also done, so no notifications are received.
When I first looked at this, I thought it was a bug, because I thought that the child-observables of GroupBy
should have been cold. But it looks like they are collectively warm, if that makes any sense: Subscribe to one of the children, and the others become hot. This makes sense in the context that GroupBy
can be used as an operator over either cold or hot observables, and there's no replay functionality baked in.
If you want to see this demonstrated, consider this:
source
.GroupBy(i => i % 3)
.Select(o => o.Take(3))
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 8
Here the mod-0 observable is unsubscribed from after 6, the third mod-0 number. Merge
then subscribes to the hot mod-2 observable, outputting the last mod-2 number 8.
I hope that helps. If you're unfamiliar with the System.Reactive concept of observable temperature, I recommend this article.
Upvotes: 2