Dave
Dave

Reputation: 2386

How to cache items in Project Reactor and avoid a Cache Stampede?

How can I perform a get/compute lookup that is non blocking and will avoid a cache stampede.

Here is an example that will not stampede, but is blocking.

public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
    Map<KEY, Signal<? extends VALUE>> cacheMap, KEY key, Mono<VALUE> mono) {
    return Mono.defer(() -> Mono.just(cacheMap.computeIfAbsent(key, k -> 
        mono.materialize().block())).dematerialize());
}

Here is an example that will not block, but can stampede.

public static <KEY, VALUE> MonoCacheBuilderCacheMiss<KEY, VALUE> lookup(
        Function<KEY, Mono<Signal<? extends VALUE>>> reader, KEY key) {
    return otherSupplier -> writer -> Mono.defer(() ->
            reader.apply(key)
                    .switchIfEmpty(otherSupplier.get()
                            .materialize()
                            .flatMap(signal -> writer.apply(key, signal)
                            )
                    )
                    .dematerialize());
}

Is there an approach that will not stampede or block? Would it make sense to just subscribe the blocking call on its own scheduler?

Upvotes: 3

Views: 3424

Answers (1)

Ben Manes
Ben Manes

Reputation: 9621

To rephrase your question, you want to avoid stampeding while allowing the computation to be performed asynchronously. This would ideally be done using a ConcurrentMap<K, Mono<V>> with computeIfAbsent that will discard the entry if the computation fails.

Caffeine's AsyncLoadingCache provides this type of behavior by using CompletableFuture<V> as the value type. You could rewrite your blocking function as

public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
    AsyncLoadingCache<KEY, VALUE> cache, KEY key, Mono<VALUE> mono) {
  return Mono.defer(() -> Mono.fromFuture(cache.get(key, (k,e) ->
            mono.subscribeOn(Schedulers.fromExecutor(e)).toFuture())));
}

As of version 2.6.x, there is no simpler AsyncCache to hear feedback if desired and it will be in the 2.7 release. This will also include a ConcurrentMap<K, CompletableFuture<V>> view which would let you generalize your method to not have a provider-specific interface. For now, you can mimic a non-loading cache by avoiding the loading methods and using Caffeine.newBuilder().buildAsync(key -> null).

Upvotes: 3

Related Questions