Bart van der Heijden
Bart van der Heijden

Reputation: 63

Limiting concurrency of Observable GroupBy / Merge Combination

We're implementing some software components using C# and Reactive Extensions. It contains functionality that splits an observable using the GroupBy method, than performs some arithmetics on those split observables on afterwards, merges the observables back together with the Merge() method.

All goes well if the maxConcurrent parameter is not used. Because If this parameter is used, it seems data is 'lost'.

Tried searching for this issue. Tried to incorporate Observable.Start and Observable.Defer but no results. Created a real small test application to show of the problem.

var sourceObservable = Enumerable.Range(0, 10)
    .Select(x => new { Index = x, Remainder = x % 3 }).ToObservable();

var ungrouped = sourceObservable.Select(x => x.Index);
var limitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
    .Select(group => group.Select(x => x.Index)).Merge(maxConcurrent: 2);
var unlimitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
    .Select(group => group.Select(x => x.Index)).Merge();

Console.WriteLine($"ungrouped: {string.Join(",", await ungrouped.ToList())}");
Console.WriteLine($"limited: {string.Join(",", await limitedGrouping.ToList())}");
Console.WriteLine($"unlimited: {string.Join(",", await unlimitedGrouping.ToList())}");

Expected that in this case, 'limitedGrouping' content would be identical to 'unlimitedGrouping' content. However it is not:

ungrouped: 0,1,2,3,4,5,6,7,8,9

limited: 0,1,3,4,6,7,9

unlimited: 0,1,2,3,4,5,6,7,8,9

The limited one is missing data numbers 2, 5, and 8. What mistake are we making here?

Upvotes: 4

Views: 644

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43474

As Shlomo explained in their answer, the grouped subsequences emitted by the GroupBy are hot, meaning that they start emitting values whether they are subscribed or not. So if you don't subscribe to them immediately after their creation, you risk losing some of their values. Actually it is guaranteed that you'll lose at least one value, because each subsequence is created with an already known first value, which is emitted synchronously, immediately after all synchronous subscriptions may have occurred.

On the other hand the Merge operator with the maxConcurrent parameter achieves the concurrency limitation by postponing the subscription to some of the emitted subsequences. So combining GroupBy and Merge makes it very easy to lose values. Here is what happens in your example:

Source:   +---0---1---2---3---4---5---6---7---8---9---|
Mod-0:        +0----------3-----------6-----------9---|
Mod-1:            +1----------4-----------7-----------|
Mod-2:                +2----------5-----------8-------|
Merge(2): +----0---1------3---4-------6---7-------9---|

The Mod-2 subsequence was subscribed when the Mod-0 completed, at which point it had already emitted all of its values.

The only solution to this problem I can think of is to make all grouped subsequences replayable, by using the Replay operator. This operator returns an IConnectableObservable<T> to which you should connect right away, so instead of the usual RefCount you should use the AutoConnect(0) operator:

var limitedGrouping = sourceObservable
    .GroupBy(x => x.Remainder)
    .Select(group => group.Replay().AutoConnect(0).Select(x => x.Index))
    .Merge(maxConcurrent: 2);

Of course this solution has the downside of increasing the memory usage. Depending on the situation, this may be anything from OK to unacceptable.

Upvotes: 0

Shlomo
Shlomo

Reputation: 14350

It looks like intended-but-confusing functionality in GroupBy. This code is equivalent, and fails similarly:

var source = Observable.Range(0, 10);
source
    .GroupBy(i => i % 3)
    .Merge(2)
    .Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9

This code is similar, but it succeeds:

var a = source.Where(i => i % 3 == 0);
var b = source.Where(i => i % 3 == 1);
var c = source.Where(i => i % 3 == 2);
var l = new List<IObservable<int>>() { a, b, c };
l.ToObservable()
    .Merge(2)
    .Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9 2 5 8

Somewhat more trippy is this:

source
    .GroupBy(i => i % 3)
    .Concat() //or .Merge(1), those are roughly equivalent.
    .Subscribe(Console.WriteLine); //Outputs 0 3 6 9

When I first looked at this, I expected for all the Merge(2) cases to be 0 1 3 4 6 7 9 2 5 8. I expected Concat, which is basically Merge(1) to be 0 3 6 9 1 4 7 2 5 8.

maxConcurrent(n) says that only n observables should be subscribed to at once. If it receives more than n observables, then it queues the extra observables, subscribing later when old observables end.

In our case, it receives three observables (mod-0, mod-1, and mod-2) in that order. It subscribes to the first two, then queues the mod-2 observable, only subscribing when mod-0 or mod-1 are done. However, when mod-0/mod-1 observables are done, the mod-2 observable is apparently also done, so no notifications are received.

When I first looked at this, I thought it was a bug, because I thought that the child-observables of GroupBy should have been cold. But it looks like they are collectively warm, if that makes any sense: Subscribe to one of the children, and the others become hot. This makes sense in the context that GroupBy can be used as an operator over either cold or hot observables, and there's no replay functionality baked in.

If you want to see this demonstrated, consider this:

source
    .GroupBy(i => i % 3)
    .Select(o => o.Take(3))
    .Merge(2)
    .Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 8

Here the mod-0 observable is unsubscribed from after 6, the third mod-0 number. Merge then subscribes to the hot mod-2 observable, outputting the last mod-2 number 8.

I hope that helps. If you're unfamiliar with the System.Reactive concept of observable temperature, I recommend this article.

Upvotes: 2

Related Questions