Reputation: 149
I'm new to reactive programming/project-reactor, trying to understand the concepts. Created a Flux with range method and subscribed. When I look at the log, everything is running on main thread.
Flux
.range(1, 5)
.log()
.subscribe(System.out::println);
System.out.println("End of Execution");
[DEBUG] (main) Using Console logging [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(1) 1 [ INFO] (main) | onNext(2) 2 [ INFO] (main) | onNext(3) 3 [ INFO] (main) | onNext(4) 4 [ INFO] (main) | onNext(5) 5 [ INFO] (main) | onComplete() End of Execution
Once Publisher is done with the emission of all elements, then only the rest of the code got executed(System.out.println("End of Execution"); in the above example). Publisher will block the thread by default? If I change the scheduler, seems it's not blocking the thread.
Flux
.range(1, 5)
.log()
.subscribeOn(Schedulers.elastic())
.subscribe(System.out::println);
System.out.println("End of Execution");
Thread.sleep(10000);
[DEBUG] (main) Using Console logging End of Execution [ INFO] (elastic-2) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (elastic-2) | request(unbounded) [ INFO] (elastic-2) | onNext(1) 1 [ INFO] (elastic-2) | onNext(2) 2 [ INFO] (elastic-2) | onNext(3) 3 [ INFO] (elastic-2) | onNext(4) 4 [ INFO] (elastic-2) | onNext(5) 5 [ INFO] (elastic-2) | onComplete()
Upvotes: 4
Views: 1806
Reputation: 59221
Reactor does not enforce a concurrency model by default and yes, many operators will continue the work on the Thread
where the subscribe()
operation happened.
But this doesn't mean that using Reactor will block the main thread. The sample you're showing is doing in-memory work, no I/O or latency involved. Also, it's subscribing right away on the result.
You can try the following snippet and see something different:
Flux.range(1, 5)
.delayElements(Duration.ofMillis(100))
.log()
.subscribe(System.out::println);
System.out.println("End of Execution");
In the logs, I'm seeing:
INFO --- [main] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
INFO --- [main] reactor.Flux.ConcatMap.1 : request(unbounded)
End of Execution
In this case, delaying elements will schedule work in a different way - and since nothing here is keeping the JVM alive, the application exits and no element from the range is consumed.
In a more common scenario, I/O and latency will be involved and that work will be scheduled in appropriate ways and will not block the main application thread.
Upvotes: 4