Reputation: 41300
I have a "logout" endpoint, for the most part I don't really care what the user enters but I want to "penalize" if they entered an invalid token so there's a check, but that check may take a while on load and I want to limit it to 3 seconds, but let it continue processing in the background.
The code block I have looks like this...
return claimsService
.revoke(request.getToken(), serverWebExchange.getRequest().getHeaders())
.timeout(Duration.ofMillis(authProperties.getRevokeProcessingTimeoutInMillis()))
.doOnNext(
serviceResponse -> {
final var serverHttpResponse = serverWebExchange.getResponse();
addCommonHeaders(serverHttpResponse);
serverHttpResponse.setStatusCode(serviceResponse.getStatusCode());
})
// on security error just return ok and add penalty
.onErrorResume(
SecurityException.class,
ex ->
Mono.just(GatewayResponse.builder().ok(true).build())
.delayElement(
Duration.ofMillis(authProperties.getPenaltyDelayInMillis()),
penaltyScheduler))
.onErrorResume(TimeoutException.class, ex1 -> respondWithOk(serverWebExchange))
.subscribeOn(logoutScheduler);
The problem is when TimeoutException is fired the revocation code stops as well.
Upvotes: 0
Views: 920
Reputation: 41300
Mono.firstWithSignal() or Mono.firstWithValue() would take two publishing sources and return the first one that responds. However, still completes the monos.
In my scenario, the output will be the same regardless (basically an OK response), but even if it is not it's not relevant as long as a response was sent.
In which case I just need two monos, one with the original and no timeout
final var logoutFromService =
claimsService
.revoke(request.getToken(), serverWebExchange.getRequest().getHeaders())
.doOnNext(
serviceResponse -> {
final var serverHttpResponse = serverWebExchange.getResponse();
addCommonHeaders(serverHttpResponse);
serverHttpResponse.setStatusCode(serviceResponse.getStatusCode());
})
.flatMap(this::addDelaySpecifiedInServiceResponse)
// on security error just return ok and add penalty
.onErrorResume(
SecurityException.class,
ex ->
Mono.just(GatewayResponse.builder().ok(true).build())
.delayElement(
Duration.ofMillis(authProperties.getPenaltyDelayInMillis()),
penaltyScheduler));
And another that just says okay with a delay that was the previous timeout.
final var timeout =
Mono.just(GatewayResponse.builder().ok(true).build())
.delayElement(Duration.ofMillis(authProperties.getRevokeProcessingTimeoutInMillis()));
Then combine them with Mono.firstWithValue(). Since I don't want to display any error to the end user.
return Mono.firstWithValue(logoutFromService, timeout);
Here it is in my controller code
Upvotes: 0
Reputation: 9947
I'm not sure if there is any more idiomatic way to do this but you can use cache()
operator to split the flow.
I've used some simplified code to demonstrate:
Mono<String> result = Mono.delay(Duration.ofSeconds(4))
.thenReturn("late result")
.cache();
result.doOnNext(x -> System.out.println("Async result: " + x))
.doOnError(e -> System.out.println("async error"))
.subscribe();
return result.timeout(Duration.ofSeconds(3))
.onErrorResume(TimeoutException.class, /*fallback*/)
Upvotes: 1