Laurent Perez
Laurent Perez

Reputation: 558

Java with Project Reactor : why is the Mono block() not seeing the error?

Given the following code, result.block() equals "xx", isError() == false, yet the boom error handler is ran, the stacktrace is shown, the promise is not completed. I would expect the result to be "ko".

What am I doing wrong ? block() Javadoc says

will return null if onComplete, T if onNext

public class RApp {

static final Logger LOG = LoggerFactory.getLogger(RApp.class);

public static void main(String[] args) {

    MonoProcessor<String> texecute = MonoProcessor.create();
    Mono<String> result = texecute.delaySubscription(Duration.ofSeconds(2))
            .onErrorReturn("ko")
            .doOnNext(s -> parse(s)
            .doOnSuccess(p -> LOG.info("promise completed {}", p))
            .doOnTerminate((z, e) -> LOG.info("term value: {} , {}", z, e))
            .doOnError(t -> {
                LOG.error("boom", t);
            })
            .subscribe());

    texecute.onNext("xx");

    LOG.info("...............;");
    String block = result.block();
    LOG.info("r={}", block);
    boolean error = texecute.isError();
    LOG.info(error ? "error" : "no error");
    texecute.dispose();

}

public static Mono<String> parse(String s) {
    System.out.println("parse s = " + s);
    if (s.equals("xx")) {
        return Mono.error(new Exception("no xx"));
    }
    return Mono.just(s);
}
}

Upvotes: 1

Views: 8809

Answers (1)

Laurent Perez
Laurent Perez

Reputation: 558

Answering myself on this one : the do* are side-effects methods not modifying the sequence per https://projectreactor.io/docs/core/release/reference/#error.handling, and the ordering of onErrorReturn matters.

Proper working solution below, with bonuses of reactor.core.Exceptions.propagate to wrap checked exceptions and a java 8 failure counter :

    LongAdder failureStat = new LongAdder();

    MonoProcessor<String> texecute = MonoProcessor.create();
    Mono<String> result = texecute
            .delaySubscription(Duration.ofSeconds(2))
            .map(e -> parse2(e)).doOnError(e -> {
                failureStat.increment();
            }).doOnSuccess(s -> {
                LOG.info("success {}", s);
            })
            .onErrorReturn("ko")
            .subscribe();

    texecute.onNext("xx");

    LOG.info("...............;");
    String block = result.block();
    LOG.info("r={}", block);
    System.out.println("failureStat = " + failureStat);
    texecute.dispose();

public static String parse2(String s) {
    System.out.println("parse s = " + s);
    if (s.equals("xx")) {
        try {
            throw new Exception("no xx");
        } catch (Exception e) {
            throw Exceptions.propagate(e);
        }
    }
    return s;
}

Upvotes: 2

Related Questions