Reputation: 161
I have two Mono<T>
that i have got from two different sources let us say KAFKA
.
My intention is to merge both these Mono
into a Flux<T>
. 1
Then use public final Mono<T> reduce(BiFunction<T,T,T> aggregator)
method in Flux
to create a final Mono
out of it (as the response time of above two Mono
may vary). 2
approach:
There are many methods such as contact
, zip
, zipWith
to use on Flux
. How do i arrive at a correct method to use (Two Mono
to Flux
conversion i.e, 1).
And is this REDUCE
approach really correct or is there anything else could be done to improvise it (2) ? Thanks.
Upvotes: 2
Views: 4610
Reputation: 72254
If you really want to use a Flux
to do this, then you'd likely want to use merge()
, similar to:
Flux.merge(mono1(), mono2()).reduce((obj1, obj2) -> foo(obj1, obj2));
...where foo()
fulfils the functionality of the reduce
method in the question, combining both objects emitted into a single value. You wouldn't want to use concat()
unless you want to subscribe to each Mono
one at a time, waiting for each to complete, rather than all together - and the Flux.zipXXX
series of operators would be used for zipping separate fluxes together, so you wouldn't want that.
However, I don't think you quite have the correct approach here for two values - if you want to put two Mono
publishers into a Flux
and then immediately reduce them back to a Mono
, then it doesn't make much sense to use a Flux
at all, since you have to wait for both the publishers to complete before emitting anything, and then you're just emitting a single value.
Instead, I'd recommend using this variant of Mono.zip()
, which allows you to do everything you need in one go, something like:
Mono.zip(mono1(), mono2(), (obj1, obj2) -> foo(obj1, obj2));
Upvotes: 2