Stav Alfi
Stav Alfi

Reputation: 13923

How does flatMap manage threads?

Flux.just("a", "b", "c")
        .log(null, Level.INFO,true) // line 18
        .flatMap(value -> Mono.just(value.toUpperCase())
                              .publishOn(Schedulers.elastic()), 2)
        .log(null, Level.INFO,true) // line 21
        .subscribe();

Some of the output:

13:03:46 [main] INFO  - | request(2)    Flux.log(App.java:18)
13:03:46 [main] INFO  - | onNext(a)     Flux.log(App.java:18)
13:03:46 [main] INFO  - | onNext(b)     Flux.log(App.java:18)
13:03:46 [elastic-2] INFO  - onNext(A)  Flux.log(App.java:21)
13:03:46 [elastic-2] INFO  - | request(1)   Flux.log(App.java:18)
13:03:46 [main] INFO  - | onNext(c)     Flux.log(App.java:18)
13:03:46 [elastic-3] INFO  - onNext(B)  Flux.log(App.java:21)
13:03:46 [elastic-3] INFO  - | request(1)   Flux.log(App.java:18)
13:03:46 [elastic-2] INFO  - onNext(C)  Flux.log(App.java:21)
13:03:46 [elastic-2] INFO  - | request(1)   Flux.log(App.java:18)
13:03:46 [main] INFO  - | onComplete()  Flux.log(App.java:18)
13:03:46 [main] INFO  - onComplete()    Flux.log(App.java:21)

Questions:

  1. Why does flatMap ask for 2 elements from the main thread and then ask for more elements from the other threads?

  2. Why isn't the subscribe handled by the main thread?

Upvotes: 2

Views: 1350

Answers (1)

Oleh Dokuka
Oleh Dokuka

Reputation: 12184

Why 2 requested from the main thread?

The first Subscription.request amount depends on your specified concurrency level, which is 2. Since you call .subscribe in the main thread, the first prefetch request will be called exactly on that thread.

Let's take a look at the next schema:

.subscribe()[Thread main] -> FluxLog.source.subscribe()[Tread Main] -> FluxFlatMap.source.subscribe()[ThreadMain] -> FluxJust.subscriber.onSubscribe() -> FluxFlatMap.subscription.request(concurrency)[Thread Main]

What happens next?

Then, starting from that point will be hardcore :). Since your inner stream will be subscribed by FlatMapInner which will observe all signals (onNext, onError, onComplete) on Scheduler.elastic (because of your .publishOn). In turn, when the inner stream has been completed, the FlatMapInnner on its onComplete will notify main FlatMapMain which is a driver of whole flatMap mechanism. Interaction between FlatMapInner and FlatMapMain is being over FlatMapMain.innerComplete. Since, from the FlatMapMain perspective, internal FlatMapInner is playing the role of Queue, all elements will be drained. Keep calm, don't panic if you have no idea whata f*** is going on here. All idea of that method is drain data from the inner stream and emit it to downstream, and then request new portion of data to upstream. The thing that you should remember is that innerComplete was called from FlatMapInner.onComplete which was moved to another Scheduler, so it means that next Subscription.request will be called from the thread specified in Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic())

So, schematically that process looks like next:

FluxFlatMap.FlatMapMain.onNext [Thread Main] -> Publisher m = mapper(...) -> m.subscribe(new FluxFlatMap.FlatMapInner()) -> FluxFlatMap.FlatMapInner.onNext("a") [Thread Elastic N] -> LambdaSubscriber.onNext("c") [Thread Elastic N] -> FluxFlatMap.FlatMapInner.onComplete() [Thread Elastic N] -> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] -> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] { ... subscription.request(amountOfCompletedInners) -> FlatMap.FlatMapMain.onNext() [Thread Elastic N] -> .... LambdaSubscriber.onNext("c") [Thread Elastic N] -> ....

Thus, you will see the first request(2) on main and then request(1) from elastic (because one inner has been completed so FlatMap will request another 1 element from upstream to satisfy the demand of concurrency).

Upvotes: 3

Related Questions