nihar
nihar

Reputation: 161

Merging two Mono and getting a Flux. Then extracting a Mono from that Flux

I have two Mono<T> that i have got from two different sources let us say KAFKA.

My intention is to merge both these Mono into a Flux<T>. 1

Then use public final Mono<T> reduce(BiFunction<T,T,T> aggregator) method in Flux to create a final Mono out of it (as the response time of above two Mono may vary). 2

approach:

There are many methods such as contact, zip, zipWith to use on Flux. How do i arrive at a correct method to use (Two Mono to Flux conversion i.e, 1).

And is this REDUCE approach really correct or is there anything else could be done to improvise it (2) ? Thanks.

Upvotes: 2

Views: 4610

Answers (1)

Michael Berry
Michael Berry

Reputation: 72254

If you really want to use a Flux to do this, then you'd likely want to use merge(), similar to:

Flux.merge(mono1(), mono2()).reduce((obj1, obj2) -> foo(obj1, obj2));

...where foo() fulfils the functionality of the reduce method in the question, combining both objects emitted into a single value. You wouldn't want to use concat() unless you want to subscribe to each Mono one at a time, waiting for each to complete, rather than all together - and the Flux.zipXXX series of operators would be used for zipping separate fluxes together, so you wouldn't want that.

However, I don't think you quite have the correct approach here for two values - if you want to put two Mono publishers into a Flux and then immediately reduce them back to a Mono, then it doesn't make much sense to use a Flux at all, since you have to wait for both the publishers to complete before emitting anything, and then you're just emitting a single value.

Instead, I'd recommend using this variant of Mono.zip(), which allows you to do everything you need in one go, something like:

Mono.zip(mono1(), mono2(), (obj1, obj2) -> foo(obj1, obj2));

Upvotes: 2

Related Questions