Daniel Olszewski
Daniel Olszewski

Reputation: 14401

Async value calculation in distributed Hazelcast cache

Current setup

I have two instances of an application running on two separate machines. Both application nodes share Hazelcast configuration for the distributed cache. The cache works fine with the regular IMap<String, SomeSerializableValue>.

Problem

Now I have a case in which the calculation for a cachable value is slow and costly. I want to make sure that the value for a given key is calculated only on a single application node. If another node fetches the value for the key but the calculation is still in progress, this node should wait for the calculation to complete.

The ideal solution would be something like this:

IMap<String, CompletableFuture<SomeSerializableValue>>

but this approach obviously doesn't work because CompletableFuture isn't serializable.

Question

Is there any alternative solution in Hazelcast which can cover my requirements?

For instance, Caffeine has a concept of AsyncLoadingCache. However, it's not distributed. Can Hazelcast cache asynchronously calculated values?

Upvotes: 1

Views: 1122

Answers (2)

bluecheese
bluecheese

Reputation: 71

I'm having the same problem as you.

Im trying to make a generic kotlin class for reading and populating the cache (as a ConcurrentMap<Key,CompletableFuture) by invoking the passed callback IF the value for the key doesn't exist in the cache.

So when I'm creating an instance for HazelCast (wrapping the value with a CompletableFuture)

    companion object {

        fun <K, V> create(config: MapConfig, name: String) =
            Hazelcast.newHazelcastInstance()
                .apply {
                    this.config.addMapConfig(config)
                }.getMap<K, CompletableFuture<V>>(name)  //TRYING TO WRAP with CompletableFuture as the AsyncLoad cache does in caffeine
    }

Im getting the following exception thrown when trying to read from the cache:

Failed to serialize 'java.util.concurrent.CompletableFuture'
com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'java.util.concurrent.CompletableFuture'
    at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:115)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:175)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:151)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toData(AbstractSerializationService.java:136)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toData(AbstractSerializationService.java:124)
    at com.hazelcast.spi.impl.NodeEngineImpl.toData(NodeEngineImpl.java:341)
    at com.hazelcast.spi.impl.AbstractDistributedObject.toData(AbstractDistributedObject.java:78)
    at com.hazelcast.map.impl.proxy.MapProxyImpl.putIfAbsent(MapProxyImpl.java:172)
    at com.hazelcast.map.impl.proxy.MapProxyImpl.putIfAbsent(MapProxyImpl.java:162)
    at cache.utility.caffeine.SuspendingCache$get$$inlined$read$1.invokeSuspend(

Here is the HZ API (including the async API for it) https://docs.hazelcast.org/docs/3.8/javadoc/com/hazelcast/core/IMap.html

I ended up with the following:


     (if (cache.containsKey(key)) {  //Read operations are thread-safe and are not blocking
            return cache.getAsync(key).await() // In Kotlin  kotlinx.coroutines.future.await(), in Java you would subscribe from this somehow
        } else {
            val value = callBack.invoke(key)
            cache.setAsync(key,value)
            return value
        }

Upvotes: 0

Neil Stevenson
Neil Stevenson

Reputation: 3150

Would Offloadable Entry Processor work ?

This allows you to apply a "long-running" update to a Map.Entry.Value, in a side thread.

While the update is running, read calls return the existing value. Only once the update calculation completes does the write become visible.

Upvotes: 1

Related Questions