Reputation: 3586
Consider the following code:
@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {
@Test
public void testOnErrorResume() {
Flux.range(0, 5)
.doOnNext(event -> log.info("Processing - {}", event))
.flatMap(event -> processEvent(event)
.doOnSuccess(result -> log.info("Processed - {}", event))
.onErrorResume(t -> handleError(t, event))
)
.doOnError(t -> log.error("Exception propagated", t))
//.log()
.then()
.subscribe();
}
private Mono<Void> processEvent(Object object) {
return Mono.error(() -> new RuntimeException("test"));
//throw new RuntimeException("test");
}
private Mono<Void> handleError(Throwable throwable, Object object) {
log.error("Processing Failed - {}", object);
return Mono.empty();
}
}
The output is completely different if the method processEvent returns a Mono.error than if it throws an Exception.
The code as it is (returning a Mono.error), I see what I expected, 300 iterations of Processing and Processins Failed, and I see no Exception propagated.
17:33:19.853 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 0 17:33:19.864 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 0 17:33:19.865 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 1 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 1 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 2 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 2 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 3 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 3 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 4 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 4
On the other hand, if I uncomment the throw, I see a single item from the Flux being processed, I do not see the message from handleError and I see the "Exception Propagated"
17:35:53.950 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 0 17:35:53.968 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Exception propagated java.lang.RuntimeException: test
If this is by design, what are the best practices for the flatMap? On easy solution that comes to mind is to surround th content of the flatMap by a try-catch to wrap the exception in a Mono.error. While it works, it is inelegant and too manual, likely to be forgotten.
Upvotes: 2
Views: 2126
Reputation: 9947
A method creating/returning a Mono
should not throw exception in such way. Since the exception is thrown before Mono
is assembled (created), the subsequent operators inside the flatMap
can't possibly take effect since they need a Mono
to operate on.
If you have no control over the processEvent()
method to fix its behaviour then you can wrap it with a Mono.defer
which will ensure that even the errors raised during the assembly period will be propagated through the Mono
inside the flatMap
:
Flux.range(0, 5)
.doOnNext(event -> log.info("Processing - {}", event))
.flatMap(event -> Mono.defer(() -> processEvent(event))
.doOnSuccess(result -> log.info("Processed - {}", event))
.onErrorResume(t -> handleError(t, event)))
.doOnError(t -> log.error("Exception propagated", t))
private Mono<Void> processEvent(Object object) {
throw new RuntimeException("test");
}
Note that inside other intermediate operators like map
or doOnNext
you are free to throw exception in the ugly way as Reactor can transform them into proper error signals since at that point a Mono
is already in progress.
Upvotes: 3