Archimedes Trajano
Archimedes Trajano

Reputation: 41300

How do you create a timeout but continue scenario in WebFlux?

I have a "logout" endpoint, for the most part I don't really care what the user enters but I want to "penalize" if they entered an invalid token so there's a check, but that check may take a while on load and I want to limit it to 3 seconds, but let it continue processing in the background.

The code block I have looks like this...

    return claimsService
        .revoke(request.getToken(), serverWebExchange.getRequest().getHeaders())
        .timeout(Duration.ofMillis(authProperties.getRevokeProcessingTimeoutInMillis()))
        .doOnNext(
            serviceResponse -> {
              final var serverHttpResponse = serverWebExchange.getResponse();
              addCommonHeaders(serverHttpResponse);
              serverHttpResponse.setStatusCode(serviceResponse.getStatusCode());
            })
        // on security error just return ok and add penalty
        .onErrorResume(
            SecurityException.class,
            ex ->
                Mono.just(GatewayResponse.builder().ok(true).build())
                    .delayElement(
                        Duration.ofMillis(authProperties.getPenaltyDelayInMillis()),
                        penaltyScheduler))
        .onErrorResume(TimeoutException.class, ex1 -> respondWithOk(serverWebExchange))
        .subscribeOn(logoutScheduler);

The problem is when TimeoutException is fired the revocation code stops as well.

Upvotes: 0

Views: 920

Answers (2)

Archimedes Trajano
Archimedes Trajano

Reputation: 41300

Mono.firstWithSignal() or Mono.firstWithValue() would take two publishing sources and return the first one that responds. However, still completes the monos.

In my scenario, the output will be the same regardless (basically an OK response), but even if it is not it's not relevant as long as a response was sent.

In which case I just need two monos, one with the original and no timeout

    final var logoutFromService =
        claimsService
            .revoke(request.getToken(), serverWebExchange.getRequest().getHeaders())
            .doOnNext(
                serviceResponse -> {
                  final var serverHttpResponse = serverWebExchange.getResponse();
                  addCommonHeaders(serverHttpResponse);
                  serverHttpResponse.setStatusCode(serviceResponse.getStatusCode());
                })
            .flatMap(this::addDelaySpecifiedInServiceResponse)
            // on security error just return ok and add penalty
            .onErrorResume(
                SecurityException.class,
                ex ->
                    Mono.just(GatewayResponse.builder().ok(true).build())
                        .delayElement(
                            Duration.ofMillis(authProperties.getPenaltyDelayInMillis()),
                            penaltyScheduler));

And another that just says okay with a delay that was the previous timeout.

    final var timeout =
        Mono.just(GatewayResponse.builder().ok(true).build())
            .delayElement(Duration.ofMillis(authProperties.getRevokeProcessingTimeoutInMillis()));

Then combine them with Mono.firstWithValue(). Since I don't want to display any error to the end user.

    return Mono.firstWithValue(logoutFromService, timeout);

Here it is in my controller code

Upvotes: 0

Martin Tarjányi
Martin Tarjányi

Reputation: 9947

I'm not sure if there is any more idiomatic way to do this but you can use cache() operator to split the flow.

I've used some simplified code to demonstrate:

Mono<String> result = Mono.delay(Duration.ofSeconds(4))
    .thenReturn("late result")
    .cache();

result.doOnNext(x -> System.out.println("Async result: " + x))
            .doOnError(e -> System.out.println("async error"))
            .subscribe();

return result.timeout(Duration.ofSeconds(3))
    .onErrorResume(TimeoutException.class, /*fallback*/)

Upvotes: 1

Related Questions