Reputation: 81
I'm new to Spring WebFlux so please, be gentle... I'm sorry if I'm missing some obvious thing but I tried looking for online examples and every time I end up with sequential calls.
I have this situation:
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class);
Response is a class in my project but for this example we can consider them as simple containers of a single List each.
I'd like to:
How do I achieve this? (since I failed multiple times)
My first attempt was:
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
ParallelFlux.from(mono1, mono2, mono3).then().block(); // im not sure if this really execute them in parallel
Response resp1 = mono1.block();
Response resp2 = mono2.block();
Response resp3 = mono3.block();
if (resp1.isNotEmpty()) {
return resp1;
}
if (resp2.isNotEmpty()) {
return resp2;
}
return resp3;
This does not seems to work, does ParallelFlux.from(mono1, mono2, mono3).then().block() really run those monos in parallel? Also why do I need ParallelFlux? Can't I just say "run this mono on a separate thread" as soon I create each mono? Each .block() actually redo the call.... like its re executing the mono... why?
UPDATE:
By reading the comments I changed my code to this:
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Tuple3<Response, Response, Response> all = Mono.zip(mono1, mono2, mono3).block();
Response resp1 = all.getT1();
Response resp2 = all.getT2();
Response resp3 = all.getT3();
if (resp1.hasMessages()) {
return resp1;
}
if (resp2.hasMessages()) {
return resp2;
}
return resp3;
Now it seems to work. Do I need to do something else or I'm ok with this solution? Should I also change Mono.zip(mono1, mono2, mono3).block()
in Mono.zip(mono1, mono2, mono3).subscribeOn(Schedulers.parallel()).block()
?
p.s. I'm reading the docs again now and I think I should use Schedulers.elastic() insted of Schedulers.parallel().
Upvotes: 3
Views: 3452
Reputation: 2218
Creating a mono doesn't automatically execute it. You need a terminal operator like subscribe
or block
to trigger the execution (subscribeOn
is not a terminal operator. You don't need it unless you want to defer your execution to a different thread pool. By default it uses a default thread pool). If you want multiple mono's to run in parallel you can use zip
operator.
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
return Mono.zip(mono1, mono2, mono3)
.map(t -> {
if (t.getT1().isEmpty()) {
if (t.getT2().isEmpty()) {
return t.getT3();
} else {
return t.getT2();
}
} else {
return t.getT1();
}
});
Note: Calling this doesn't execute and give you result. It gives you back a mono on which you can call a subscribe()
to get the result.
Upvotes: 6