Reputation: 129
At what point does the spring webflux do the subscription? Everywhere I have read that there must be a subscription otherwise no change happens. In my short time with Spring Webflux, I have never seen a subscribe()
neither in the controller or services.
My doubt is also when using flatMap()
, map()
,... etc.. at what point does the subscription take place?
What I have read does not really resolve my doubts.
public Flux method(){
....
myFlux.flatMap(data -> {
....
}).flatMap(e -> { .... });
}
I know this is an asynchronous issue, but each flatMap
runs at the same time?...and so sometimes some data I have noticed is null.
Upvotes: 6
Views: 8958
Reputation: 17460
The answer is: it depends.
For example, if this is a Spring Controller method, then it is the framework itself that subscribes to the Mono
or Flux
.
If it is a method that is triggered from time to time by a Scheduler, then you must explicitly subscribe to the Mono
or Flux
, otherwise, no processing will take place.
This means that if your application only exposes a REST API and no processing need to be triggered in any other way, then it is very likely that you will never need to explicitly subscribe to a Mono
or Flux
because Spring will take care of that by you.
Upvotes: 0
Reputation: 9947
It's the framework (spring-webflux) that subscribes to the returned Mono
or Flux
. For example if you use Netty (that's the default), then subscription happens here based on my debugging:
https://github.com/reactor/reactor-netty/blob/db27625064fc78f8374c1ef0af3160ec3ae979f4/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java#L962
Also, this article might be of help to understand what happens when: https://spring.io/blog/2019/03/06/flight-of-the-flux-1-assembly-vs-subscription
Upvotes: 7
Reputation: 126
You need to call a .subscribe()
or block()
function after your flatmap. Here's an example.
Assuming that myFlux is of type Flux, the following will execute the subscription based on the example above
myFlux.subscribe(System.out::println);
Here's an explanation on a separate StackOverflow thread.
But in your method
function, you are returning a Flux
object - so it's up to the consumer of the method()
function how it wants to subscribe to the Flux. You shouldn't be trying to subscribe to the Flux from within
Upvotes: 0