jon_wu
jon_wu

Reputation: 1152

How to populate a Caffeine AsyncCache with Scaffeine and return a Map without a race condition?

I'm using Scaffeine (a Scala wrapper around Caffeine) to try and populate an AsyncCache using async loader functions, then I want to wait until the cache is populated, and return a Map (synchronous).

Unfortunately, I'm running into a race condition, so I think I may be making some bad assumptions. See comments in the code below.

import com.github.blemale.scaffeine.{AsyncCache, Scaffeine}

import scala.collection.concurrent
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

sealed trait Cacheable[A] {
  def cacheKey(obj: A): Any
  def bytes(obj: A): Future[Array[Byte]]
}

object CacheRaceDebug {

  def buildCache[A](objs: Seq[A])(implicit cacheable: Cacheable[A]): Future[concurrent.Map[Any, Array[Byte]]] = {
    val cache: AsyncCache[Any, Array[Byte]] = Scaffeine().buildAsync()
    Future
      .sequence(objs.map { o =>
        val key: Any = implicitly[Cacheable[A]].cacheKey(o)
        val bytes: Future[Array[Byte]] = implicitly[Cacheable[A]].bytes(o)
        // bytes is an async "computation" for something like a generated image or download.
        // This is intended to populate the cache.
        // We'll wait for the returned `Future` from this an all other loads to know when the cache should be populated.
        cache.getFuture(key, _ => bytes)
      })
      .map((allBytes: Seq[Array[Byte]]) => {
        // Now that all the `Future`s returned from `getFuture` above have resolved,
        // we (APPARENTLY INCORRECTLY) assume the cache has been loaded and all values are ready.
        // The intention here is to return a map that can used synchronously with all values.
        cache.synchronous.asMap
      })
  }

}

I suspect this is not related to Scala or Scaffeine specifically, but that I'm making an incorrect assumption about Caffeine's getFuture method. Is the problem that the Future returned by this doesn't guarantee that the resolved value is actually in the cache already?

Is there a better way to wait for the all loads in progress to complete and for the cache to be updated before calling cache.synchronous.toMap?

Workaround

...or should I just build the synchronous Map manually in Scala and forget about trying to get the map from the cache directly, or at all?

Something like this works fine (returning a tuple with k->v, then using _.toMap to build one from the entries), but it's still helpful to know where my assumptions are wrong around Scaffeine or Caffeine for future reference:

Future
  .traverse(objs) { obj =>
    cache.getFuture(key, _ => obj.bytes)
      .map(bytes => key -> bytes)
  }.map(_.toMap)

and in this specific scenario where all the objects are known locally, it's easy to deduplicate they keys first and avoid the cache completely.

Caffeine "directly" also didn't work

I also tried something similar without Scaffeine and that had the same issue:

val cache = Caffeine.newBuilder().asInstanceOf[Caffeine[Any, Array[Byte]]].buildAsync[Any, Array[Byte]]()
Future
  .traverse(objs) { obj =>
    cache
      .get(obj.cacheKey, asJavaBiFunction((_: Any, _: Executor) => obj.bytes.toJava.toCompletableFuture))
      .toScala
  }
  .map(_ => cache.synchronous.asMap.asScala.toMap)

Upvotes: 0

Views: 48

Answers (0)

Related Questions