Reputation: 2386
How can I perform a get/compute lookup that is non blocking and will avoid a cache stampede.
Here is an example that will not stampede, but is blocking.
public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
Map<KEY, Signal<? extends VALUE>> cacheMap, KEY key, Mono<VALUE> mono) {
return Mono.defer(() -> Mono.just(cacheMap.computeIfAbsent(key, k ->
mono.materialize().block())).dematerialize());
}
Here is an example that will not block, but can stampede.
public static <KEY, VALUE> MonoCacheBuilderCacheMiss<KEY, VALUE> lookup(
Function<KEY, Mono<Signal<? extends VALUE>>> reader, KEY key) {
return otherSupplier -> writer -> Mono.defer(() ->
reader.apply(key)
.switchIfEmpty(otherSupplier.get()
.materialize()
.flatMap(signal -> writer.apply(key, signal)
)
)
.dematerialize());
}
Is there an approach that will not stampede or block? Would it make sense to just subscribe the blocking call on its own scheduler?
Upvotes: 3
Views: 3424
Reputation: 9621
To rephrase your question, you want to avoid stampeding while allowing the computation to be performed asynchronously. This would ideally be done using a ConcurrentMap<K, Mono<V>>
with computeIfAbsent
that will discard the entry if the computation fails.
Caffeine's AsyncLoadingCache
provides this type of behavior by using CompletableFuture<V>
as the value type. You could rewrite your blocking function as
public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
AsyncLoadingCache<KEY, VALUE> cache, KEY key, Mono<VALUE> mono) {
return Mono.defer(() -> Mono.fromFuture(cache.get(key, (k,e) ->
mono.subscribeOn(Schedulers.fromExecutor(e)).toFuture())));
}
As of version 2.6.x, there is no simpler AsyncCache
to hear feedback if desired and it will be in the 2.7 release. This will also include a ConcurrentMap<K, CompletableFuture<V>>
view which would let you generalize your method to not have a provider-specific interface. For now, you can mimic a non-loading cache by avoiding the loading methods and using Caffeine.newBuilder().buildAsync(key -> null)
.
Upvotes: 3