levant pied
levant pied

Reputation: 4481

Flux parallel-serial execution with groupBy

Say I have this:

Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)
   .groupBy(i -> i % 3);

and say I have a method:

Mono<Integer> getFromService(Integer i);

I want to call getFromService in parallel for each of the groups, but make sure the calls are serial within each group.

For the above example that would be three parallel streams with these input values:

stream 1: 0 -> 3 -> 6 -> 9
stream 2: 1 -> 4 -> 7 -> 10
stream 3: 2 -> 5 -> 8 -> 11

I tried this, but it's not doing what I want:

Flux.range(0, 12)
   .groupBy(i -> i % 3)
   .flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))

This is calling the service in parallel for all the ints at once. How do I proceed?

Upvotes: 1

Views: 2578

Answers (1)

Phil Clay
Phil Clay

Reputation: 4536

Use either concatMap or flatMapSequential instead of the inner .flatMap

If you want sequential execution within each group (i.e. only one subscription to getFromService at a single time within each group), then use .concatMap, like this:

Flux.range(0, 12)
   .groupBy(i -> i % 3)
   .flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))

If parallel execution within a group is ok, but you just care about the order in which the sequence is emitted, then use flatMapSequential, like this:

Flux.range(0, 12)
    .groupBy(i -> i % 3)
    .flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))

Another option is to use .flatMap with the concurrency argument set to 1, but I'd recommend one of the above instead.

Upvotes: 2

Related Questions