Sam
Sam

Reputation: 9952

How do I use an async cache with Kotlin coroutines?

I have a Kotlin JVM server application using coroutines and I need to put a cache in front of a non-blocking network call. I figure I can use a Caffeine AsyncLoadingCache to get the non-blocking cache behaviour I need. The AsyncCacheLoader interface I would need to implement uses CompletableFuture. Meanwhile, the method I want to call to load the cache entries is a suspend function.

I can bridge the gap like this:

abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
    abstract suspend fun load(key: K): V

    final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
        return GlobalScope.async(executor.asCoroutineDispatcher()) {
            load(key)
        }.asCompletableFuture()
    }
}

This will run the load function on the provided Executor (by default, the ForkJoinPool), which from the point of view of Caffeine is the correct behaviour.

However, I know that I should try to avoid using GlobalScope to launch coroutines.

I considered having my SuspendingCacheLoader implement CoroutineScope and manage its own coroutine context. But CoroutineScope is intended to be implemented by objects with a managed lifecycle. Neither the cache nor the AsyncCacheLoader has any lifecycle hooks. The cache owns the Executor and the CompletableFuture instances, so it already controls the lifecycle of the loading tasks that way. I can't see that having the tasks be owned by a coroutine context would add anything, and I'm worried that I wouldn't be able to correctly close the coroutine context after the cache stopped being used.

Writing my own asynchronous caching mechanism would be prohibitively difficult, so I'd like to integrate with the Caffeine implementation if I can.

Is using GlobalScope the right approach to implement AsyncCacheLoader, or is there a better solution?

Upvotes: 3

Views: 7378

Answers (6)

Lior Derei
Lior Derei

Reputation: 119

@Sam had a beautiful solution! just to make it completely generic so one could provide a loading function from out side:

class SuspendCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
    suspend fun get(key: K, retrieveValue: suspend (K) -> V): V = supervisorScope {
        getAsync(key, retrieveValue).await()
    }

    private fun CoroutineScope.getAsync(key: K, retrieveValue: suspend (K) -> V) = asyncCache.get(key) { k, _ ->
        future {
            retrieveValue(k)
        }
    }
}

Upvotes: 0

Debop
Debop

Reputation: 1075

Suggest extension method like this

suspend inline fun <K: Any, V: Any> Caffeine<Any, Any>.suspendingLoadingCache(
    crossinline suspendedLoader: suspend (key: K) -> V
): AsyncLoadingCache<K, V> =
    buildAsync { key, executor: Executor ->
        CoroutineScope(executor.asCoroutineDispatcher()).future {
            suspendedLoader(key)
        }
    }

Not recommand GlobalScope, use CoroutineScope(executor.asCoroutineDispatcher())

future method is defined in kotlinx-coroutines-jdk8 module

Upvotes: 2

Sam
Sam

Reputation: 9952

After some thought I've come up with a much simpler solution that I think uses coroutines more idiomatically.

The approach works by using AsyncCache.get(key, mappingFunction), instead of implementing an AsyncCacheLoader. However, it ignores the Executor that the cache is configured to use, following the advice of some of the other answers here.

class SuspendingCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
    suspend fun get(key: K): V = supervisorScope {
        getAsync(key).await()
    }

    private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, _ ->
        future { 
            loadValue(k) 
        }
    }

    private suspend fun loadValue(key: K): V = TODO("Load the value")
}

Note that this depends on kotlinx-coroutines-jdk8 for the future coroutine builder and the await() function.

I think ignoring the Executor is probably the right choice. As @Kiskae points out, the cache will use the ForkJoinPool by default. Choosing to use that rather than the default coroutine dispatcher is probably not useful. However, it would be easy to use it if we wanted to, by changing the getAsync function:

private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, executor ->
    future(executor.asCoroutineDispatcher()) { 
        loadValue(k) 
    }
}

Upvotes: 4

lightsing
lightsing

Reputation: 280

Here's a simple solution. Replace the K, V notation with your type.

    val cache = Caffeine.newBuilder().buildAsync<K, V> { key: K, _ ->
      val future = CompletableFuture<V>()

      launch {
        val result = someAwaitOperation(key)
        future.complete(result)
      }

      future
    }

Upvotes: 1

javen
javen

Reputation: 138

Here is my solution:

Define an extension function of CoroutineVerticle

fun <K, V> CoroutineVerticle.buildCache(configurator: Caffeine<Any, Any>.() -> Unit = {}, loader: suspend CoroutineScope.(K) -> V) = Caffeine.newBuilder().apply(configurator).buildAsync { key: K, _ ->
    // do not use cache's executor
    future {
        loader(key)
    }
}

Create our cache within CoroutineVerticle

val cache : AsyncLoadingCache<String, String> = buildCache({
  maximumSize(10_000)
  expireAfterWrite(10, TimeUnit.MINUTES)
}) { key ->
    // load data and return it
    delay(1000)
    "data for key: $key"
}

Use the cache

suspend fun doSomething() {
    val data = cache.get('key').await()

    val future = cache.get('key2')
    val data2 = future.await()
}

Upvotes: 0

Kiskae
Kiskae

Reputation: 25603

The cache owns the Executor and the CompletableFuture instances, so it already controls the lifecycle of the loading tasks that way.

This is not true, the documentation on Caffeine specifies that it uses a user-provided Executor or ForkJoinPool.commonPool() if none is provided. This means that there is no default lifecycle.

Regardless directly calling GlobalScope seems like the wrong solution because there is no reason to hardcode a choice. Simply provide a CoroutineScope through the constructor and use GlobalScope as an argument while you don't have an explicit lifecycle for the cache to bind to.

Upvotes: 3

Related Questions