J_D
J_D

Reputation: 3586

Exception thrown in a flatMap are ignored by onErrorResume operator

Consider the following code:

@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {

    @Test
    public void testOnErrorResume() {
        Flux.range(0, 5)
                .doOnNext(event -> log.info("Processing -  {}", event))
                .flatMap(event -> processEvent(event)
                        .doOnSuccess(result -> log.info("Processed - {}", event))
                        .onErrorResume(t -> handleError(t, event))
                )
                .doOnError(t -> log.error("Exception propagated", t))
                //.log()
                .then()
                .subscribe();
    }

    private Mono<Void> processEvent(Object object) {
        return Mono.error(() -> new RuntimeException("test"));
        //throw new RuntimeException("test");
    }
    
    private Mono<Void> handleError(Throwable throwable, Object object) {
        log.error("Processing Failed - {}", object);
        
        return Mono.empty();
    }
    
}

The output is completely different if the method processEvent returns a Mono.error than if it throws an Exception.

The code as it is (returning a Mono.error), I see what I expected, 300 iterations of Processing and Processins Failed, and I see no Exception propagated.

17:33:19.853 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest

  • Processing - 0 17:33:19.864 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 0 17:33:19.865 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 1 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 1 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 2 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 2 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 3 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 3 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 4 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 4

On the other hand, if I uncomment the throw, I see a single item from the Flux being processed, I do not see the message from handleError and I see the "Exception Propagated"

17:35:53.950 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest

  • Processing - 0 17:35:53.968 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Exception propagated java.lang.RuntimeException: test

If this is by design, what are the best practices for the flatMap? On easy solution that comes to mind is to surround th content of the flatMap by a try-catch to wrap the exception in a Mono.error. While it works, it is inelegant and too manual, likely to be forgotten.

Upvotes: 2

Views: 2126

Answers (1)

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9947

A method creating/returning a Mono should not throw exception in such way. Since the exception is thrown before Mono is assembled (created), the subsequent operators inside the flatMap can't possibly take effect since they need a Mono to operate on.

If you have no control over the processEvent() method to fix its behaviour then you can wrap it with a Mono.defer which will ensure that even the errors raised during the assembly period will be propagated through the Mono inside the flatMap:

Flux.range(0, 5)
    .doOnNext(event -> log.info("Processing -  {}", event))
    .flatMap(event -> Mono.defer(() -> processEvent(event))
                .doOnSuccess(result -> log.info("Processed - {}", event))
                .onErrorResume(t -> handleError(t, event)))
    .doOnError(t -> log.error("Exception propagated", t))


private Mono<Void> processEvent(Object object) {
    throw new RuntimeException("test");
}

Note that inside other intermediate operators like map or doOnNext you are free to throw exception in the ugly way as Reactor can transform them into proper error signals since at that point a Mono is already in progress.

Upvotes: 3

Related Questions