ropes-nopes
ropes-nopes

Reputation: 205

Whats the difference between flatMap, flatMapSequential and concatMap in Project Reactor?

I've read from the documentation that flatMap:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

that flatMapSequential:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

and that concatMap:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation. There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:

Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.

Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.

Interleaving: this operator does not let values from different inners interleave (concatenation).

The difference between flatMap and the other two is pretty understandable, but I don't understand when the difference between concatMap and flatMapSequential takes place. Is there any performance difference between the two? I've read that flatMapSequential has a buffer size for some queue, but I don't understand why concatMap doesn't need one.

Upvotes: 13

Views: 13646

Answers (1)

lkatiforis
lkatiforis

Reputation: 6255

The flatMap and flatMapSequential operators subscribe eagerly, the concatMap waits for each inner completion before generating the next sub-stream and subscribing to it.

Let's see an example:

  @Test
  void test_flatMap() {
    Flux.just(1, 2, 3)
        .flatMap(this::doSomethingAsync)
        //.flatMapSequential(this::doSomethingAsync)
        //.concatMap(this::doSomethingAsync)
        .doOnNext(n -> log.info("Done {}", n))
        .blockLast();
  }

  private Mono<Integer> doSomethingAsync(Integer number) {
    //add some delay for the second item...
    return number == 2 ? Mono.just(number).doOnNext(n -> log.info("Executing {}", n)).delayElement(Duration.ofSeconds(1))
        : Mono.just(number).doOnNext(n -> log.info("Executing {}", n));
  }

Output:

2022-04-22 19:38:49,164  INFO main - Executing 1
2022-04-22 19:38:49,168  INFO main - Done 1
2022-04-22 19:38:49,198  INFO main - Executing 2
2022-04-22 19:38:49,200  INFO main - Executing 3
2022-04-22 19:38:49,200  INFO main - Done 3
2022-04-22 19:38:50,200  INFO parallel-1 - Done 2

As you can see, flatMap does not preserve original ordering, and has subscribed to all three elements eagerly. Also, notice that element 3 has proceeded before element 2.

Here is the output using flatMapSequential:

2022-04-22 19:53:40,229  INFO main - Executing 1
2022-04-22 19:53:40,232  INFO main - Done 1
2022-04-22 19:53:40,261  INFO main - Executing 2
2022-04-22 19:53:40,263  INFO main - Executing 3
2022-04-22 19:53:41,263  INFO parallel-1 - Done 2
2022-04-22 19:53:41,264  INFO parallel-1 - Done 3

flatMapSequential has subscribed to all three elements eagerly like flatMap but preserves the order by queuing elements received out of order.

Here is the output using concatMap:

2022-04-22 19:59:31,817  INFO main - Executing 1
2022-04-22 19:59:31,820  INFO main - Done 1
2022-04-22 19:59:31,853  INFO main - Executing 2
2022-04-22 19:59:32,857  INFO parallel-1 - Done 2
2022-04-22 19:59:32,857  INFO parallel-1 - Executing 3
2022-04-22 19:59:32,857  INFO parallel-1 - Done 3

concatMap naturally preserves the same order as the source elements.

Upvotes: 38

Related Questions