Vento
Vento

Reputation: 81

Execute three Mono in parallel as soon they are created, wait for all to finish and collect the results with a specific order/logic

I'm new to Spring WebFlux so please, be gentle... I'm sorry if I'm missing some obvious thing but I tried looking for online examples and every time I end up with sequential calls.

I have this situation:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class);

Response is a class in my project but for this example we can consider them as simple containers of a single List each.

I'd like to:

How do I achieve this? (since I failed multiple times)

My first attempt was:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
ParallelFlux.from(mono1, mono2, mono3).then().block(); // im not sure if this really execute them in parallel
Response resp1 = mono1.block();
Response resp2 = mono2.block();
Response resp3 = mono3.block();
if (resp1.isNotEmpty()) {
    return resp1;
}
if (resp2.isNotEmpty()) {
    return resp2;
}
return resp3;

This does not seems to work, does ParallelFlux.from(mono1, mono2, mono3).then().block() really run those monos in parallel? Also why do I need ParallelFlux? Can't I just say "run this mono on a separate thread" as soon I create each mono? Each .block() actually redo the call.... like its re executing the mono... why?

UPDATE:

By reading the comments I changed my code to this:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());

Tuple3<Response, Response, Response> all = Mono.zip(mono1, mono2, mono3).block();

Response resp1 = all.getT1();
Response resp2 = all.getT2();
Response resp3 = all.getT3();

if (resp1.hasMessages()) {
    return resp1;
}
if (resp2.hasMessages()) {
    return resp2;
}
return resp3;

Now it seems to work. Do I need to do something else or I'm ok with this solution? Should I also change Mono.zip(mono1, mono2, mono3).block() in Mono.zip(mono1, mono2, mono3).subscribeOn(Schedulers.parallel()).block() ? p.s. I'm reading the docs again now and I think I should use Schedulers.elastic() insted of Schedulers.parallel().

Upvotes: 3

Views: 3452

Answers (1)

Akhil Bojedla
Akhil Bojedla

Reputation: 2218

Creating a mono doesn't automatically execute it. You need a terminal operator like subscribe or block to trigger the execution (subscribeOn is not a terminal operator. You don't need it unless you want to defer your execution to a different thread pool. By default it uses a default thread pool). If you want multiple mono's to run in parallel you can use zip operator.

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());

return Mono.zip(mono1, mono2, mono3)
        .map(t -> {
            if (t.getT1().isEmpty()) {
                if (t.getT2().isEmpty()) {
                    return t.getT3();
                } else {
                    return t.getT2();
                }
            } else {
                return t.getT1();
            }
        });

Note: Calling this doesn't execute and give you result. It gives you back a mono on which you can call a subscribe() to get the result.

Upvotes: 6

Related Questions