Reputation: 737
In my application I am using spring webflux and I am using webclient to retrieve details from some 3rd party API. Now, I want to store the first time webClient response in some in memory cache so that for 2nd time I can have those response directly from the cache. I am trying to use Spring boot in memory caching mechanism and also "caffine". But none is working as expected. application.yml:
spring:
cache:
cache-names: employee
caffiene:
spec: maximumSize=200, expireAfterAccess=5m
EmployeeApplication.java:
@SpringBootApplication
@EnableCaching
public class EmployeeApplication{
public static void main(String[] args){
}
}
EmployeeController.java:
It has a rest endpoint employee/all
which fetch all employee from the 3rd party Api.
EmployeeService.java:
@Service
@Slf4j
public class EmployeeService{
@Autowired
private WebClient webClient;
@Autowired
private CacheManager cacheManager;
@Cacheable("employee")
public Mono<List<Employee>> getAllEmployee(){
log.info("inside employee service {}");
return webClient.get()
.uri("/employees/")
.retrieve()
.bodyToMono(Employee.class);
}
}
Although I have configured the cache name , 2nd time when I hit the url it is calling the service method. What cache mechanism need to be used to cache Mono response? Please suggest.
Upvotes: 14
Views: 17341
Reputation: 5924
There are several options to cache reactive publishers.
.cache()
Use reactive cache
API to cache Mono
for the defined duration
employeeService.getAllEmployee()
.cache(Duration.ofMinutes(60))
.flatMap(employees -> {
// process data
})
in many cases it would mean to create a private field with the defined cache and then use it as a part of the reactive flow.
Flux<> cachedEmployee = employeeService.getAllEmployee()
.cache(Duration.ofMinutes(60))
....
cachedEmployee.flatMap(employees -> {
// process data
})
Caffeine supports async cache based on CompletableFuture that could be easily adapted to Reactive API.
AsyncLoadingCache<String, List<Employee>> cache = Caffeine.newBuilder()
.buildAsync((tenant, executor) ->
employeeService.getAllEmployee(tenant).toFuture()
);
Mono<List<Employee>> getEmployee(String tenant) {
return Mono.fromCompletionStage(clientCache.get(tenant));
}
(https://projectreactor.io/docs/extra/snapshot/api/reactor/cache/CacheMono.html) from reactor-extra
. This option is more suitable if you need to cache results based on different input (e.g. multi tenant environment)
UPDATE: CacheMono
has been deprecated since reactor-extra 3.4.7. Better use #2 Use external cache with Caffeine.
Here is an example for Guava but you could adapt it for CacheManager
Cache<String, List<Employee>> cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheTtl)
.build();
Mono<List<Employee>> getEmployee(String tenant) {
return CacheMono.lookup(key -> Mono.justOrEmpty(cache.getIfPresent(key)).map(Signal::next), tenant)
.onCacheMissResume(() -> employeeService.getAllEmployee(tenant))
.andWriteWith((key, signal) -> Mono.fromRunnable(() ->
Optional.ofNullable(signal.get())
.ifPresent(value -> cache.put(key, value))
)
);
}
Upvotes: 8
Reputation: 93
We can use Mono.cache() and still avoid caching exceptions/errors and thus avoiding memory leaks. I found the below article that shows how to do it.
https://nickolasfisher.com/blog/InMemory-Caching-in-Sprint-Boot-WebfluxProject-Reactor
Upvotes: 4
Reputation: 39
A warning when using Mono.cache(): It caches Exceptions as well.
We just produced a very effective leak by combining Mono.cache() with Springs @Cachable and Coffein and one more library that added a suppressed exception to the thrown exception. Since it was always the identical exception thrown from cache, suppressed exceptions where accumulating on this exception. All that resulted in a runaway situation as soon as the cached exeption was logged. Up to 3 cores used explicitly for logging since the exception messages got so long with all the supressed exceptions. And the initial exception was a timeout.
Upvotes: 1