Reputation: 421
I am trying to create a Caffeine cache for my spring webflux application.
Here is the implementation:
private final Cache<String, OfferData> localCache = Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.initialCapacity(1)
.removalListener((RemovalListener<String, OfferData>) (key, value, cause) -> {
log.debug(">> Caffeine Cache => Key: " + key + " | Value:" + value + " | Cause:" + cause);
})
.recordStats()
.build();
The caveat here is that I cannot use the refreshAfterWrite
property here. It is only applicable for Loading Cache and I cannot use the Loading Cache becuase the loader function returns a Mono.
Loader function below:
@Retryable(value = {RuntimeException.class}, maxAttempts = 2, backoff = @Backoff(delayExpression = "${service.offer.local-cache-load-retry-delay}"))
private Mono<OfferData> loadLocalCache() {
log.info(LOG_LOAD_LOCAL_CACHE + "Re-loading local offer cache...");
log.info(LOG_LOAD_LOCAL_CACHE + "Fetching offers from external cache...");
return fetchOffersFromExternalCache()
.doOnNext(offerData -> log.info(LOG_LOAD_LOCAL_CACHE + "Ads fetched successfully from external cache."))
.onErrorResume(throwable -> {
// Failed to load from the external cache. No data present in the cache.
// Loading the data from Ads Manager to the cache
log.error(LOG_LOAD_LOCAL_CACHE + "No offer data could be fetched from external cache: " + throwable);
log.info(LOG_LOAD_LOCAL_CACHE + "Fetching offers from source...");
// If data cannot be retrieved from the source, we do not populate the internal cache with anything
// for that key. We leave it blank for the next call to try and populate it.
return fetchOffersFromSource()
.doOnNext(offerData -> storeOffersInExternalCache(offerData, offerServiceProperties.getExternalCacheExpiration()))
.doOnError(sourceThrowable -> log.error(LOG_LOAD_LOCAL_CACHE + "Offer data not available at source: {}", sourceThrowable.getMessage()));
});
}
My implementation to extract the value from Mono and then store it in the cache:
public Mono<List<RuntimeOfferDetails>> getOffers(String eventName) {
return CacheMono.lookup(
k -> Mono.justOrEmpty(localCache.getIfPresent(k)).map(Signal::next),
OFFER_DATA_KEY
).onCacheMissResume(this::loadLocalCache)
.andWriteWith((k, sig) -> Mono.fromRunnable(() -> localCache.put(k, Objects.requireNonNull(sig.get()))))
.map(offerData -> getOfferDetails(offerData, eventName))
.map(offerDetailsList -> offerDetailsList.stream()
.map(RuntimeOfferDetails::toRuntimeOfferDetails)
.collect(Collectors.toList())
).doOnError(throwable -> {
log.error(LOG_GET_OFFERS + "No offer data found in the local cache neither could it be retrieved from external cache or source: {}", throwable.getMessage());
meterRegistry.counter(METRIC_FETCH_RUNTIME_OFFER_DETAILS, TAG_OUTCOME, FAILURE).increment();
}).doOnSuccess(runtimeOfferDetails -> {
log.info(LOG_GET_OFFERS + "Runtime offer details retrieved successfully.");
log.debug(LOG_GET_OFFERS + "Runtime offer details: [{}]", runtimeOfferDetails);
meterRegistry.counter(METRIC_FETCH_RUNTIME_OFFER_DETAILS, TAG_OUTCOME, SUCCESS).increment();
}).switchIfEmpty(Mono.error(new OfferServiceException("No offer data found for the event: " + eventName)));
}
I can control the cache by using the expireAfterWrite
property but again, it will just evict the keys from the in-memory cache. I want the same control that I had with refreshAfterWrite
but for a cache other than Loading cache. How can I implement this?
Upvotes: 2
Views: 992