user3139545
user3139545

Reputation: 7374

Thread safe reactive cache

I always get stuck with the reactive way of thinking once I have to start doing multiple external calls. Right now im tasked with implementing the following scenario using Reactor and Webflux.

  1. Get token from cache
    1. Token not found
      1. Request from auth system new token
      2. Persist token in cache
    2. Token found
      1. Use token for request to external system
        1. External system responds 2xx
          1. Return response
        2. External system responds 403
          1. Invalidate token in cache
          2. Retry whole flow from the top

What I have so far is just an example on how I would like to use the cache:

  public static Mono<ServerResponse> doRequest(
      Function<Mono<Token>, Mono<ClientResponse>> clientRequest,
      Function<Mono<ClientResponse>, Mono<ServerResponse>> errorHandler
  ) {
    Token token = null;
    return token.getToken()
        .transform(clientRequest)
        .flatMap(clientResponse -> {
              if (clientResponse.statusCode().value() == 403) {
                // HOW WOULD I INVALIDATE ANYTHING HERE? Or where should i do it?
                return Mono.error(new TokenExpired("Token expired"));
              }
              return Mono.just(clientResponse);
        })
        .transform(errorHandler)
        .retryWhen(companion -> {
      return companion.zipWith(Flux.range(1,4),
          (error, index) -> {
            if (index < 4 && (error instanceof TokenExpired)) {
              return index;
            } else {
              throw Exceptions.propagate(error);
            }
          });
    });
  }

I have looked at addons cache but failed at understanding how to manually invalidate the cache since its only time based? Also when using the Caffeine cache the behavior of invalidate is undefined in a multi threaded environment. I feel that my use case is standard but has been unable to find any patterns for how to do this.

My two issues I get stuck with are: Making sure the updating of the cache only happens once and is non blocking. How to invalidate the entry in the cache in a non blocking way.

Really im stuck with how to approach the problem in a reactive way, an accepted answer does not have to be Reactor it could be any reactive library showing the thinking needed to solve the problem.

Upvotes: 2

Views: 2580

Answers (1)

Ben Manes
Ben Manes

Reputation: 9591

Caffeine will discard an entry when the computation fails or is null, and propagate that to the caller. For an AsyncCache, it stores the future immediately and a whenComplete callback does the error handling if necessary. Since Caffeine works with CompleteableFuture, you can use Reactor's mono/future converters.

AsyncCache<Token, ServerResponse> responseCache = Caffeine.newBuilder().buildAsync();

public static Mono<ServerResponse> doRequest(
    Function<Mono<Token>, Mono<ClientResponse>> clientRequest,
    Function<Mono<ClientResponse>, Mono<ServerResponse>> errorHandler) {
  Token token = // ...
  Mono<ServerResponse> result = Mono.fromFuture(() -> token.getToken()
      .transform(t -> cache.get(t, (token, executor) -> {
        Mono<ServerResponse> response = clientRequest.apply(token);
        return (response.statusCode().value() == HttpServletResponse.SC_FORBIDDEN)
            ? Mono.error(new TokenExpired("Token expired")).toFuture()
            : Mono.just(clientResponse).toFuture();
      })));
  return result.transform(errorHandler).retryWhen(...);
}

The cache.get(key, func) will insert the future only if the mapping is not present (an expired or collected entry pending cleanup is treated as absent). This blocks only for the duration of the future being returned to the cache, which should be cheap if all the work is wrapped in the future. Any other requests will wait for the key->future mapping to be established and be returned the same future, until that mapping is removed (fails or evicted).

It may be possible to write a generic adapter of AsyncCache to Reactor based on this idiom.

Upvotes: 3

Related Questions