Reputation: 13923
Flux.just("a", "b", "c")
.log(null, Level.INFO,true) // line 18
.flatMap(value -> Mono.just(value.toUpperCase())
.publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO,true) // line 21
.subscribe();
Some of the output:
13:03:46 [main] INFO - | request(2) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(a) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(b) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(A) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(c) Flux.log(App.java:18)
13:03:46 [elastic-3] INFO - onNext(B) Flux.log(App.java:21)
13:03:46 [elastic-3] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(C) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onComplete() Flux.log(App.java:18)
13:03:46 [main] INFO - onComplete() Flux.log(App.java:21)
Questions:
Why does flatMap
ask for 2 elements from the main
thread and then ask for more elements from the other threads?
Why isn't the subscribe
handled by the main
thread?
Upvotes: 2
Views: 1350
Reputation: 12184
The first Subscription.request
amount depends on your specified concurrency level, which is 2
. Since you call .subscribe
in the main thread, the first prefetch
request will be called exactly on that thread.
Let's take a look at the next schema:
.subscribe()[Thread main]
-> FluxLog.source.subscribe()[Tread Main]
-> FluxFlatMap.source.subscribe()[ThreadMain]
-> FluxJust.subscriber.onSubscribe()
-> FluxFlatMap.subscription.request(concurrency)[Thread Main]
Then, starting from that point will be hardcore :). Since your inner stream will be subscribed by FlatMapInner
which will observe all signals (onNext, onError, onComplete) on Scheduler.elastic
(because of your .publishOn
). In turn, when the inner stream has been completed, the FlatMapInnner
on its onComplete
will notify main FlatMapMain
which is a driver of whole flatMap
mechanism. Interaction between FlatMapInner
and FlatMapMain
is being over FlatMapMain.innerComplete
. Since, from the FlatMapMain perspective, internal FlatMapInner
is playing the role of Queue
, all elements will be drained
. Keep calm, don't panic if you have no idea whata f*** is going on here. All idea of that method is drain data from the inner stream and emit it to downstream, and then request new portion of data to upstream. The thing that you should remember is that innerComplete
was called from FlatMapInner.onComplete
which was moved to another Scheduler, so it means that next Subscription.request
will be called from the thread specified in Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic())
So, schematically that process looks like next:
FluxFlatMap.FlatMapMain.onNext [Thread Main]
-> Publisher m = mapper(...)
-> m.subscribe(new FluxFlatMap.FlatMapInner())
-> FluxFlatMap.FlatMapInner.onNext("a") [Thread Elastic N]
-> LambdaSubscriber.onNext("c") [Thread Elastic N]
-> FluxFlatMap.FlatMapInner.onComplete() [Thread Elastic N]
-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N]
-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] { ... subscription.request(amountOfCompletedInners)
-> FlatMap.FlatMapMain.onNext() [Thread Elastic N]
-> .... LambdaSubscriber.onNext("c") [Thread Elastic N]
-> ....
Thus, you will see the first request(2) on main and then request(1) from elastic (because one inner has been completed so FlatMap will request another 1 element from upstream to satisfy the demand of concurrency).
Upvotes: 3