Reputation: 558
Given the following code, result.block() equals "xx", isError() == false, yet the boom error handler is ran, the stacktrace is shown, the promise is not completed. I would expect the result to be "ko".
What am I doing wrong ? block() Javadoc says
will return null if onComplete, T if onNext
public class RApp {
static final Logger LOG = LoggerFactory.getLogger(RApp.class);
public static void main(String[] args) {
MonoProcessor<String> texecute = MonoProcessor.create();
Mono<String> result = texecute.delaySubscription(Duration.ofSeconds(2))
.onErrorReturn("ko")
.doOnNext(s -> parse(s)
.doOnSuccess(p -> LOG.info("promise completed {}", p))
.doOnTerminate((z, e) -> LOG.info("term value: {} , {}", z, e))
.doOnError(t -> {
LOG.error("boom", t);
})
.subscribe());
texecute.onNext("xx");
LOG.info("...............;");
String block = result.block();
LOG.info("r={}", block);
boolean error = texecute.isError();
LOG.info(error ? "error" : "no error");
texecute.dispose();
}
public static Mono<String> parse(String s) {
System.out.println("parse s = " + s);
if (s.equals("xx")) {
return Mono.error(new Exception("no xx"));
}
return Mono.just(s);
}
}
Upvotes: 1
Views: 8809
Reputation: 558
Answering myself on this one : the do* are side-effects methods not modifying the sequence per https://projectreactor.io/docs/core/release/reference/#error.handling, and the ordering of onErrorReturn matters.
Proper working solution below, with bonuses of reactor.core.Exceptions.propagate to wrap checked exceptions and a java 8 failure counter :
LongAdder failureStat = new LongAdder();
MonoProcessor<String> texecute = MonoProcessor.create();
Mono<String> result = texecute
.delaySubscription(Duration.ofSeconds(2))
.map(e -> parse2(e)).doOnError(e -> {
failureStat.increment();
}).doOnSuccess(s -> {
LOG.info("success {}", s);
})
.onErrorReturn("ko")
.subscribe();
texecute.onNext("xx");
LOG.info("...............;");
String block = result.block();
LOG.info("r={}", block);
System.out.println("failureStat = " + failureStat);
texecute.dispose();
public static String parse2(String s) {
System.out.println("parse s = " + s);
if (s.equals("xx")) {
try {
throw new Exception("no xx");
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
return s;
}
Upvotes: 2