Reputation: 59
I have basic rest controller in Spring.To try out spring webflux and understand its non blocking nature.I created two controller mappings one to read and and one to serve the webclient call(as shown below)
@GetMapping("/slow-service-tweets")
private List<String> getAllTweets() {
try {
Thread.sleep(2000L); // delay
} catch (InterruptedException e) {
e.printStackTrace();
}
return Arrays.asList(
"Item1", "Item2","Item3");
}
And here is my testing get api which just triggers the code given below(first version)
@GetMapping("/test")
public void doSomething(){
log.info("Starting NON-BLOCKING Controller!");
Flux<String> tweetFlux = WebClient.create()
.get()
.uri("http://localhost:9090/slow-service-tweets")
.retrieve()
.bodyToFlux(String.class);
tweetFlux.subscribe(tweet ->{
try {
log.info("i am back");
Thread.sleep(6000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(tweet.toString());});
log.info("Exiting NON-BLOCKING Controller!");
The above code behaves exactly as it should.The output is
Starting NON-BLOCKING Controller!
Exiting NON-BLOCKING Controller!
Item1
Item2
Item3
The reason being the thread does not block on subscribe of flux and proceeds ahead. Now please look in the second version of this below.
@GetMapping("/test")
public void doSomething(){
System.out.println("i am here");
Flux<Integer> f= Flux.just(1,2,3,4,5);
// Flux<Integer> f= Flux.fromIterable(testService.returnStaticList());
f.subscribe(consumer->{
try {
log.info("consuming");
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(consumer);
});
log.info("doing something else");
}
Ideally like the earlier example "doing something else" must be printed immediately. But no matter what i do it takes 10 seconds to print all element and then prints "doing something else".Output below:
i am here
consuming
1
2
3
4
5
doing something else
Can anyone please explain what am i missing here??
Upvotes: 2
Views: 579
Reputation: 72284
I feel like I need to start with a warning - this is most certainly not how to use Webflux. Any time you call subscribe, you're almost certainly doing something wrong - that should be left to the framework. Instead, you should be using:
doOnNext()
for side effects like logging;delayElements()
if you want to delay each element in a Flux, rather than using Thread.sleep();Flux
(or returning some publisher created by chaining your Flux using operators) from your doSomething()
method, to allow the framework to subscribe and therefore execute your reactive chain.If you follow the above "normal" way of doing things, you likely won't run into these sorts of problems with blocking / threading causing unexpected problems. If you do more niche things like subscribe yourself, block threads without thinking if they should be blocked, etc. - then you'll likely cause yourself a world of issues.
However, to answer the question directly as to why this behaviour happens - it's because of threading. When you use Flux.just()
, you're using the immediate scheduler (which means the same thread that's actually executing your doSomething()
method in the first place.) Since there's only one thread at play here, your subscribe method blocks this one thread it until it's complete, so nothing else can execute. If you were to tell your Flux
to publish on the boundedElastic()
thread pool for instance like so, you'd find it behaves as you expect:
Flux<Integer> f = Flux.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.boundedElastic());
In your other example, when you use WebClient, it's publishing on a different thread to the one executing your method - so blocking this different thread in a subscriber doesn't hold up the overall executing of your doSomething()
method.
Upvotes: 2