ace
ace

Reputation: 12044

How to handle errors in Spring reactor Mono or Flux?

I have below code retuning Mono<Foo>:

try {
    return userRepository.findById(id)  // step 1
        .flatMap(user -> barRepository.findByUserId( user.getId())  // step 2
        .map(bar-> Foo.builder().msg("Already exists").build())  // step 3
            .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())  // step 4
                .map(bar-> Foo.builder().msg("Created").build())   // step 5 
            ))
            .doOnError(throwable -> Mono.just(handleError(throwable)));
    } catch(Exception e) {
        
        log.error("from catch block");
        return Mono.just(handleError(e));
        
    }

If an error occurs in step 1 (e.g. user does not exist by the specified id), will it be caught by doOnError or by try-catch block, or none of these two?

Same question if an error happens in step 2, step 3, step 4.

What is the correct code so that error is always caught by doOnError and eliminates try-catch?

I am using:

public interface UserRepository extends ReactiveMongoRepository<User, String>

same for barRepository.

handleError(throwable) simply does log.error(e.getMessage()) and retuns Foo.

Upvotes: 27

Views: 111514

Answers (6)

Dharmvir Tiwari
Dharmvir Tiwari

Reputation: 916

While creating the reactive flow, we need to use onError* as it provides a fallback Mono/Flux while doOn* are side-effect operators.

NOTE: The examples are in Kotlin

Below is an example:

fun saveItems(item: Item) = testRepository.save(item)
        .onErrorResume {
            Mono.error(
                onErrorResumeHandler(
                    it,
                    "APP-1002",
                    "Error occurred while saving the something :P, contact admin"
                )
            )
        }

fun onErrorResumeHandler(exception: Throwable, errorCode: String, errorMessage: String) =
    if (exception is TestRepositoryException) exception else
        TestServiceException(errorCode, errorMessage)

There should be a central exception handler, we can create by extending AbstractErrorWebExceptionHandler. The order is -2 to supersede the default.

Below is an example:

@Component
@Order(-2)
class BaseControllerAdvice(
    errorAttributes: ErrorAttributes,
    resources: WebProperties.Resources,
    applicationContext: ApplicationContext,
    serverCodecConfigurer: ServerCodecConfigurer
) : AbstractErrorWebExceptionHandler(errorAttributes, resources, applicationContext) {

    val log = logger()

    init {
        setMessageWriters(serverCodecConfigurer.writers)
    }

    override fun getRoutingFunction(errorAttributes: ErrorAttributes?) =
        router {
            RequestPredicates.all().invoke(this@BaseControllerAdvice::renderErrorResponse)
        }
    //RouterFunctions.route(RequestPredicates.all(),this::renderErrorResponse)

    fun renderErrorResponse(
        request: ServerRequest
    ): Mono<ServerResponse> {
        val errorPropertiesMap = getErrorAttributes(
            request,
            ErrorAttributeOptions.defaults()
        )
        val ex: ApplicationException = getError(request) as ApplicationException
        log.info("Error attributes:{}", request)
        return ServerResponse.status(HttpStatus.BAD_REQUEST)
            .contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromValue(ErrorResponseVO(ex.errorCode, ex.errorMessage)))
    }

    data class ErrorResponseVO(val errorMessage: String, val errorCode: String)

}

Upvotes: 1

Andrea Ciccotta
Andrea Ciccotta

Reputation: 672

I think the first error is in the title: "Mono or Flux" is not related with the error handling.

  • Mono can only emit one item at the most (streams one element)
  • Flux can emit more complex stuff (i.e. List)

To handle errors you can follow this example:

return webClient.get()
                .uri(url)
                .retrieve()
                .bodyToMono(ModelYouAreRetrieving.class)
                .doOnError(throwable -> logger.error("Failed for some reason", throwable))
                .onErrorReturn(new ModelYouAreRetrieving(...))
                .block();

Upvotes: 17

quintin
quintin

Reputation: 837

Use Exceptions.propagate(e) which wraps a checked exception into a special runtime exception that can be handled by onError

Below Code tries to covers User attributes in upper case. Now, when it encounters kyle the checked exception is throws and MIKE is returned from onErrorReturn

@Test
void Test19() {
    Flux.fromIterable(Arrays.asList(new User("jhon", "10000"),
            new User("kyle", "bot")))
        .map(x -> {
            try {
                return toUpper(x);
            } catch (TestException e) {
                throw Exceptions.propagate(e);
            }
        })
        .onErrorReturn(new User("MIKE", "BOT")).subscribe(x -> System.out.println(x));
}

protected final class TestException extends Exception {
    private static final long serialVersionUID = -831485594512095557L;
}

private User toUpper(User user) throws TestException{
    if (user.getName().equals("kyle")) {
        throw new TestException();
    }
    return new User(user.getName().toUpperCase(), user.getProfession().toUpperCase());
}

Output

User [name=JHON, profession=10000]
User [name=MIKE, profession=BOT]

Upvotes: 4

Shuai Wang
Shuai Wang

Reputation: 152

@Gianluca Pinto's last line of code is also incorrect. The code won't be compiled. onErrorReturn is not suitable for complicated error handling. What you should use is onErrorResume.

see: https://grokonez.com/reactive-programming/reactor/reactor-handle-error#21_By_falling_back_to_another_Flux

onErrorResume will fall back to another Flux and let you catch and manage the exception thrown by previous Flux. if look into the implementation of onErrorReturn, you will find onErrorReturn is actually using onErrorResume.

So here the code should be:

.onErrorResume(throwable -> Mono.just(handleError(throwable)));

Upvotes: 2

Gianluca Pinto
Gianluca Pinto

Reputation: 345

The last line of the code of @James Ralston is wrong. The correct code should be:

return userRepository.findById(id)
.flatMap ( user -> 
    barRepository.findByUserId(user.getId())
    .map((user,bar)-> Foo.builder().msg("Already exists").build())  
    .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())
    .map(bar-> Foo.builder().msg("Created").build())

))
.onErrorReturn(Mono.just(handleError(throwable)));

Upvotes: 0

James Ralston
James Ralston

Reputation: 1208

DoOnError will only perform side effects and assuming the findById are will return a Mono.Error() if it fails something like this should work.

return userRepository.findById(id)
    .flatMap ( user -> 
        barRepository.findByUserId(user.getId())
        .map((user,bar)-> Foo.builder().msg("Already exists").build())  
        .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())
        .map(bar-> Foo.builder().msg("Created").build())

    ))
    .onErrorReturn(throwable -> Mono.just(handleError(throwable)));

The try catch will only work if you either call a blocking operation of the chain, or a runtime error occurs before you enter the reactive chain. the doOn operations do not modify the chain, they are used for side effects only. Since flatMap expects a producer, you will need to return a Mono from the call, and in this case if an error occurs, then it will just propagate the error. In all reactive chains the error will propagate unless otherwise handled.

Upvotes: 6

Related Questions