stumbler
stumbler

Reputation: 737

Spring boot Reactive caching

In my application I am using spring webflux and I am using webclient to retrieve details from some 3rd party API. Now, I want to store the first time webClient response in some in memory cache so that for 2nd time I can have those response directly from the cache. I am trying to use Spring boot in memory caching mechanism and also "caffine". But none is working as expected. application.yml:

spring:
 cache:
  cache-names: employee
 caffiene:
  spec: maximumSize=200, expireAfterAccess=5m

EmployeeApplication.java:

@SpringBootApplication
@EnableCaching
public class EmployeeApplication{
   public static void main(String[] args){
    
}
}

EmployeeController.java: It has a rest endpoint employee/all which fetch all employee from the 3rd party Api. EmployeeService.java:

@Service
@Slf4j
public class EmployeeService{
  @Autowired
  private WebClient webClient;
  @Autowired
  private CacheManager cacheManager;
  @Cacheable("employee")
  public Mono<List<Employee>> getAllEmployee(){
    log.info("inside employee service {}");
    return webClient.get()
        .uri("/employees/")
        .retrieve()
        .bodyToMono(Employee.class);
}
}

Although I have configured the cache name , 2nd time when I hit the url it is calling the service method. What cache mechanism need to be used to cache Mono response? Please suggest.

Upvotes: 14

Views: 17341

Answers (3)

Alex
Alex

Reputation: 5924

There are several options to cache reactive publishers.

1. Reactive .cache()

Use reactive cache API to cache Mono for the defined duration

employeeService.getAllEmployee()
    .cache(Duration.ofMinutes(60))
    .flatMap(employees -> {
        // process data
    })

in many cases it would mean to create a private field with the defined cache and then use it as a part of the reactive flow.

Flux<> cachedEmployee = employeeService.getAllEmployee()
    .cache(Duration.ofMinutes(60))
    
....

cachedEmployee.flatMap(employees -> {
    // process data
})

2. Use external cache with Caffeine.

Caffeine supports async cache based on CompletableFuture that could be easily adapted to Reactive API.

AsyncLoadingCache<String, List<Employee>> cache = Caffeine.newBuilder()
    .buildAsync((tenant, executor) ->
            employeeService.getAllEmployee(tenant).toFuture()
    );


Mono<List<Employee>> getEmployee(String tenant) {
    return Mono.fromCompletionStage(clientCache.get(tenant));
}

3. Use external cache with Guava and CacheMono [DEPRECATED]

(https://projectreactor.io/docs/extra/snapshot/api/reactor/cache/CacheMono.html) from reactor-extra. This option is more suitable if you need to cache results based on different input (e.g. multi tenant environment)

UPDATE: CacheMono has been deprecated since reactor-extra 3.4.7. Better use #2 Use external cache with Caffeine.

Here is an example for Guava but you could adapt it for CacheManager

Cache<String, List<Employee>> cache = CacheBuilder.newBuilder()
        .expireAfterWrite(cacheTtl)
        .build();


Mono<List<Employee>> getEmployee(String tenant) {
    return CacheMono.lookup(key -> Mono.justOrEmpty(cache.getIfPresent(key)).map(Signal::next), tenant)
            .onCacheMissResume(() -> employeeService.getAllEmployee(tenant))
            .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                            Optional.ofNullable(signal.get())
                                    .ifPresent(value -> cache.put(key, value))
                    )
            );
}

Upvotes: 8

Arun Subramanian
Arun Subramanian

Reputation: 93

We can use Mono.cache() and still avoid caching exceptions/errors and thus avoiding memory leaks. I found the below article that shows how to do it.

https://nickolasfisher.com/blog/InMemory-Caching-in-Sprint-Boot-WebfluxProject-Reactor

Upvotes: 4

flok
flok

Reputation: 39

A warning when using Mono.cache(): It caches Exceptions as well.

We just produced a very effective leak by combining Mono.cache() with Springs @Cachable and Coffein and one more library that added a suppressed exception to the thrown exception. Since it was always the identical exception thrown from cache, suppressed exceptions where accumulating on this exception. All that resulted in a runaway situation as soon as the cached exeption was logged. Up to 3 cores used explicitly for logging since the exception messages got so long with all the supressed exceptions. And the initial exception was a timeout.

Upvotes: 1

Related Questions