Reputation: 930
I've got a very simple stream based on the book of Hands-On Reactive Programming in Spring 5
.
Flux.just(1, 2, 3).publishOn(Schedulers.elastic()))
.concatMap(i -> Flux.range(0, i).publishOn(Schedulers.elastic()))
.subscribe(log::info);
However, there's no console output at all. But if I add doOnNext
after just
:
Flux.just(1, 2, 3).doOnNext(log::debug).publishOn(Schedulers.elastic()))
.concatMap(i -> Flux.range(0, i).publishOn(Schedulers.elastic()))
.subscribe(log::info);
then I can get both output of debug and info. May I know why?
Edit 1: There's the console output of the following stream:
Flux.just(1, 2, 3).doOnNext(log::debug)
.publishOn(Schedulers.elastic())).doOnNext(log::warn)
.concatMap(i -> Flux.range(0, i).publishOn(Schedulers.elastic()))
.subscribe(log::info);
And output:
[main] INFO ReactiveTest - 1
[main] INFO ReactiveTest - 2
[elastic-2] WARN ReactiveTest - 1
[main] INFO ReactiveTest - 3
[elastic-2] DEBUG ReactiveTest - 0
[elastic-2] WARN ReactiveTest - 2
[elastic-2] DEBUG ReactiveTest - 0
[elastic-2] DEBUG ReactiveTest - 1
[elastic-2] WARN ReactiveTest - 3
[elastic-2] DEBUG ReactiveTest - 0
[elastic-2] DEBUG ReactiveTest - 1
[elastic-2] DEBUG ReactiveTest - 2
I think the log messages prove that the function in subscribe
will be called at the same thread as the function of concatMap
.
Upvotes: 0
Views: 1266
Reputation: 766
Your first program is probably terminating right after you call subscribe
. From the docs of subscribe
:
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
In second program, doOnNext
is invoked in the middle of processing, so it has time to output all the results. If you run the program many times you will see that it sometimes is not able to output the second log.
Upvotes: 1