Reputation: 7374
I always get stuck with the reactive way of thinking once I have to start doing multiple external calls. Right now im tasked with implementing the following scenario using Reactor and Webflux.
What I have so far is just an example on how I would like to use the cache:
public static Mono<ServerResponse> doRequest(
Function<Mono<Token>, Mono<ClientResponse>> clientRequest,
Function<Mono<ClientResponse>, Mono<ServerResponse>> errorHandler
) {
Token token = null;
return token.getToken()
.transform(clientRequest)
.flatMap(clientResponse -> {
if (clientResponse.statusCode().value() == 403) {
// HOW WOULD I INVALIDATE ANYTHING HERE? Or where should i do it?
return Mono.error(new TokenExpired("Token expired"));
}
return Mono.just(clientResponse);
})
.transform(errorHandler)
.retryWhen(companion -> {
return companion.zipWith(Flux.range(1,4),
(error, index) -> {
if (index < 4 && (error instanceof TokenExpired)) {
return index;
} else {
throw Exceptions.propagate(error);
}
});
});
}
I have looked at addons cache but failed at understanding how to manually invalidate the cache since its only time based? Also when using the Caffeine cache the behavior of invalidate is undefined in a multi threaded environment. I feel that my use case is standard but has been unable to find any patterns for how to do this.
My two issues I get stuck with are: Making sure the updating of the cache only happens once and is non blocking. How to invalidate the entry in the cache in a non blocking way.
Really im stuck with how to approach the problem in a reactive way, an accepted answer does not have to be Reactor it could be any reactive library showing the thinking needed to solve the problem.
Upvotes: 2
Views: 2580
Reputation: 9591
Caffeine will discard an entry when the computation fails or is null, and propagate that to the caller. For an AsyncCache
, it stores the future immediately and a whenComplete
callback does the error handling if necessary. Since Caffeine works with CompleteableFuture
, you can use Reactor's mono/future converters.
AsyncCache<Token, ServerResponse> responseCache = Caffeine.newBuilder().buildAsync();
public static Mono<ServerResponse> doRequest(
Function<Mono<Token>, Mono<ClientResponse>> clientRequest,
Function<Mono<ClientResponse>, Mono<ServerResponse>> errorHandler) {
Token token = // ...
Mono<ServerResponse> result = Mono.fromFuture(() -> token.getToken()
.transform(t -> cache.get(t, (token, executor) -> {
Mono<ServerResponse> response = clientRequest.apply(token);
return (response.statusCode().value() == HttpServletResponse.SC_FORBIDDEN)
? Mono.error(new TokenExpired("Token expired")).toFuture()
: Mono.just(clientResponse).toFuture();
})));
return result.transform(errorHandler).retryWhen(...);
}
The cache.get(key, func)
will insert the future only if the mapping is not present (an expired or collected entry pending cleanup is treated as absent). This blocks only for the duration of the future being returned to the cache, which should be cheap if all the work is wrapped in the future. Any other requests will wait for the key->future mapping to be established and be returned the same future, until that mapping is removed (fails or evicted).
It may be possible to write a generic adapter of AsyncCache
to Reactor based on this idiom.
Upvotes: 3