Winster
Winster

Reputation: 1013

Reactor Mono publish to multiple methods

I have a problem in publishing an object to multiple methods. Simplified version of my code is given below.

package org.example.reactive;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;

@Slf4j
public class MonoTest {

    public static void main(String... args) {
        MonoTest m = new MonoTest();
        Mono<A> aMono = m.getA();
        Mono<B> bMono = aMono.flatMap(m::getB);
        Mono<C> cMono = aMono.flatMap(m::getC);
        Mono<D> dMono = cMono.zipWith(bMono).flatMap(m::getD);
        Mono<E> eMono = Mono.zip(aMono, cMono, dMono)
                .flatMap(m::getE);
        aMono
                .zipWith(eMono)
                .subscribe(m::onCompleted, m::onFailed);
    }

    private Mono<A> getA(){
        log.info("inside getA");
        return Mono.just(new A());
    }
    private Mono<B> getB(A a){
        log.info("inside getB");
        return Mono.just(new B());
    }
    private Mono<C> getC(A a){
        log.info("inside getC");
        return Mono.just(new C());
    }
    private Mono<D> getD(Tuple2 t){
        log.info("inside getD");
        return Mono.just(new D());
    }
    private Mono<E> getE(Tuple3 t){
        log.info("inside getE");
        return Mono.just(new E());
    }
    private void onCompleted(Tuple2 t){
        log.info("inside onCompleted");
    }
    private void onFailed(Throwable t){
        log.info("inside onFailed");
    }

    class A {}
    class B {}
    class C {}
    class D {}
    class E {}
}

I expect call to each method only once. But getC is invoked twice. What is wrong here? Program output is as follows

org.example.reactive.MonoTest - inside getA

org.example.reactive.MonoTest - inside getC

org.example.reactive.MonoTest - inside getC

org.example.reactive.MonoTest - inside getB

org.example.reactive.MonoTest - inside getD

org.example.reactive.MonoTest - inside getE

org.example.reactive.MonoTest - inside onCompleted

EDIT

Well, I could solve it by caching as follows.

        Mono<A> aMono = m.getA().cache();
        Mono<B> bMono = aMono.flatMap(m::getB).cache();
        Mono<C> cMono = aMono.flatMap(m::getC).cache();
        Mono<D> dMono = cMono.zipWith(bMono).flatMap(m::getD).cache();

Upvotes: 2

Views: 2196

Answers (1)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

There are two patterns in your set of Monos:

  • aMono is a constant and is resolved eagerly once, due to direct variable assingment (you call getA() once)
  • on the other hand, other monos call getX() methods from within operators, notably flatMap. Which means these calls are performed lazily, when the flatmapping mono is subscribed to

aMono is the only top-level invocation of a getX() method. Replace the Mono variables with their definition, except for aMono and it should become clearer what happens:

MonoTest m = new MonoTest();

Mono<A> aMono = m.getA(); // <-- getA log
aMono.zipWith(
    Mono.zip(
       aMono,
       aMono.flatMap(m::getC),  // <-- getC log
       aMono.flatMap(m::getC) // <-- getC log
            .zipWith(aMono.flatMap(m::getB)) // <-- getB log
            .flatMap(m::getD) // <-- getD log
    ).flatMap(m::getE) // <-- getE log
  ).subscribe(...);

which is why you get the number and order of logs you reported.

Upvotes: 3

Related Questions