Reputation: 1013
I have a problem in publishing an object to multiple methods. Simplified version of my code is given below.
package org.example.reactive;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
@Slf4j
public class MonoTest {
public static void main(String... args) {
MonoTest m = new MonoTest();
Mono<A> aMono = m.getA();
Mono<B> bMono = aMono.flatMap(m::getB);
Mono<C> cMono = aMono.flatMap(m::getC);
Mono<D> dMono = cMono.zipWith(bMono).flatMap(m::getD);
Mono<E> eMono = Mono.zip(aMono, cMono, dMono)
.flatMap(m::getE);
aMono
.zipWith(eMono)
.subscribe(m::onCompleted, m::onFailed);
}
private Mono<A> getA(){
log.info("inside getA");
return Mono.just(new A());
}
private Mono<B> getB(A a){
log.info("inside getB");
return Mono.just(new B());
}
private Mono<C> getC(A a){
log.info("inside getC");
return Mono.just(new C());
}
private Mono<D> getD(Tuple2 t){
log.info("inside getD");
return Mono.just(new D());
}
private Mono<E> getE(Tuple3 t){
log.info("inside getE");
return Mono.just(new E());
}
private void onCompleted(Tuple2 t){
log.info("inside onCompleted");
}
private void onFailed(Throwable t){
log.info("inside onFailed");
}
class A {}
class B {}
class C {}
class D {}
class E {}
}
I expect call to each method only once. But getC is invoked twice. What is wrong here? Program output is as follows
org.example.reactive.MonoTest - inside getA
org.example.reactive.MonoTest - inside getC
org.example.reactive.MonoTest - inside getC
org.example.reactive.MonoTest - inside getB
org.example.reactive.MonoTest - inside getD
org.example.reactive.MonoTest - inside getE
org.example.reactive.MonoTest - inside onCompleted
EDIT
Well, I could solve it by caching as follows.
Mono<A> aMono = m.getA().cache();
Mono<B> bMono = aMono.flatMap(m::getB).cache();
Mono<C> cMono = aMono.flatMap(m::getC).cache();
Mono<D> dMono = cMono.zipWith(bMono).flatMap(m::getD).cache();
Upvotes: 2
Views: 2196
Reputation: 28301
There are two patterns in your set of Mono
s:
aMono
is a constant and is resolved eagerly once, due to direct variable assingment (you call getA()
once)getX()
methods from within operators, notably flatMap
. Which means these calls are performed lazily, when the flatmapping mono is subscribed toaMono
is the only top-level invocation of a getX()
method.
Replace the Mono
variables with their definition, except for aMono
and it should become clearer what happens:
MonoTest m = new MonoTest();
Mono<A> aMono = m.getA(); // <-- getA log
aMono.zipWith(
Mono.zip(
aMono,
aMono.flatMap(m::getC), // <-- getC log
aMono.flatMap(m::getC) // <-- getC log
.zipWith(aMono.flatMap(m::getB)) // <-- getB log
.flatMap(m::getD) // <-- getD log
).flatMap(m::getE) // <-- getE log
).subscribe(...);
which is why you get the number and order of logs you reported.
Upvotes: 3