Reputation: 4481
Say I have this:
Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)
.groupBy(i -> i % 3);
and say I have a method:
Mono<Integer> getFromService(Integer i);
I want to call getFromService
in parallel for each of the groups, but make sure the calls are serial within each group.
For the above example that would be three parallel streams with these input values:
stream 1: 0 -> 3 -> 6 -> 9
stream 2: 1 -> 4 -> 7 -> 10
stream 3: 2 -> 5 -> 8 -> 11
I tried this, but it's not doing what I want:
Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))
This is calling the service in parallel for all the ints at once. How do I proceed?
Upvotes: 1
Views: 2578
Reputation: 4536
Use either concatMap
or flatMapSequential
instead of the inner .flatMap
If you want sequential execution within each group (i.e. only one subscription to getFromService
at a single time within each group), then use .concatMap
, like this:
Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))
If parallel execution within a group is ok, but you just care about the order in which the sequence is emitted, then use flatMapSequential
, like this:
Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))
Another option is to use .flatMap
with the concurrency
argument set to 1
, but I'd recommend one of the above instead.
Upvotes: 2