excbot
excbot

Reputation: 123

Is there a limited number of GroupedFlux<T> created groupBy operator

According to the document reference the groupBy operator splits a given Flux into multiple GroupedFlux depending on the keymapper function of the operator. If I execute the following code with a range of 257 integers, it works correctly bu not with 258

    public void groupByTest() {
    Flux.range(1, 258)
            .groupBy(val -> val)
            .concatMap(g -> g.map(val -> val + "test"))
            .doOnNext(System.out::println)
            .blockLast();
}

Is that mean that the groupBy operator cannot create more than 257 groups?

Upvotes: 4

Views: 2362

Answers (1)

Simon Baslé
Simon Baslé

Reputation: 28301

As stated in the groupBy javadoc:

The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

What that means is that once a group is emitted, groupBy needs to get more request for it to make progress. By default, it opens up to 256 groups and then it needs either more request or to detect that a group is complete. And groupBy cannot "know" if a group is complete until either:

  • A) the group is cancelled (in which case it will recreate a new group if a value with the same key appears later)
  • B) the source has been entirely processed (which can only happen if the source is < 256 elements, the default groupBy prefetch, or if groupBy received onComplete signal from the source)

Both the val -> val criteria and concatMap work against these requirements.

The groupBy criteria ultimately generates as many groups as there are values. Here 258 groups, vs a default capacity for groupBy to keep track of 256 groups.

Note: If the whole sequence starts less than 256 groups, it would work fine. Try setting the criteria to val -> val % 2 and see that it works. Then try to bump the range to range(1, 513) and see how it hangs again.

The last test was limited to 512 elements due to how concatMap works.

concatMap is especially bad in our case, because it will only subscribe to the next group and make progress when the first group has completed. This clashes with condition B) above, creating a situation where neither groupBy nor concatMap can make progress.

Note: In the small example with 513, concatMap would start consuming group 1 and wait for it to complete before it consumes group 2. BUT groupBy stops emitting once it has fetched 256 elements for group 1 and then waits for downstream to start consuming group 2. As a result, it has just too few data to detect that the group is complete, concatMap waits for that completion signal and never subscribes to group 2, hanging the whole thing.

Using a flatMap would fix that, because flatMap will subscribe to multiple groups concurrently, and 2 groups is no trouble for it: it will consume both groups and make progress.

Upvotes: 7

Related Questions