Reputation: 123
According to the document reference the groupBy operator splits a given Flux into multiple GroupedFlux depending on the keymapper function of the operator. If I execute the following code with a range of 257 integers, it works correctly bu not with 258
public void groupByTest() {
Flux.range(1, 258)
.groupBy(val -> val)
.concatMap(g -> g.map(val -> val + "test"))
.doOnNext(System.out::println)
.blockLast();
}
Is that mean that the groupBy operator cannot create more than 257 groups?
Upvotes: 4
Views: 2362
Reputation: 28301
As stated in the groupBy
javadoc:
The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a
flatMap
with amaxConcurrency
parameter that is set too low).
What that means is that once a group is emitted, groupBy
needs to get more request for it to make progress. By default, it opens up to 256 groups and then it needs either more request or to detect that a group is complete. And groupBy
cannot "know" if a group is complete until either:
prefetch
, or if groupBy
received onComplete
signal from the source)Both the val -> val
criteria and concatMap
work against these requirements.
The groupBy
criteria ultimately generates as many groups as there are values. Here 258 groups, vs a default capacity for groupBy
to keep track of 256 groups.
Note: If the whole sequence starts less than 256 groups, it would work fine. Try setting the criteria to
val -> val % 2
and see that it works. Then try to bump the range torange(1, 513)
and see how it hangs again.
The last test was limited to 512 elements due to how concatMap
works.
concatMap
is especially bad in our case, because it will only subscribe to the next group and make progress when the first group has completed. This clashes with condition B) above, creating a situation where neither groupBy
nor concatMap
can make progress.
Note: In the small example with 513,
concatMap
would start consuming group 1 and wait for it to complete before it consumes group 2. BUTgroupBy
stops emitting once it has fetched 256 elements for group 1 and then waits for downstream to start consuming group 2. As a result, it has just too few data to detect that the group is complete,concatMap
waits for that completion signal and never subscribes to group 2, hanging the whole thing.Using a
flatMap
would fix that, becauseflatMap
will subscribe to multiple groups concurrently, and 2 groups is no trouble for it: it will consume both groups and make progress.
Upvotes: 7