sebjun87
sebjun87

Reputation: 61

Hazelcast JCache CacheEntryListener triggered too often

it seems to me that I have a problem with the callback mechanism of the JCache EntryListener. I am trying to set up several jcache members (hazelcast implementation), each running on their on as single java application on my local machine (for now in Ecipse starting main manually for each node).

The nodes hold just one empty cache at the beginning <Long, SpecificDataType>. Using the default hazelcast-default.xml, I´ve prepared a minimal configuration in the programmatical way on my own, mainly registering a CacheEntryListener, that produces a simple sysout for a created/updated/removed/expired entry. When I start several (three or more) members, I expect each member to print out the modified key/value pair exactly once, where the entry is contained (either as operational or backup entry). The problem is, that in some instances, the sysout shows up multiple times, which seems to me that the listener is triggered too often (more than once).

For example, on creation of some entry with the aid of a simple client (which just gets the cache and puts an entry with key 123689 into the cache), the sysout "CREATE EVENT RECEIVED" shows up 4 times in the third member and even more often in the fourth and so on...

CREATE EVENT RECEIVED:
Key: 123689, Value:SpecificDataType[...]

CREATE EVENT RECEIVED:
Key: 123689, Value:SpecificDataType[...]

CREATE EVENT RECEIVED: 
Key: 123689, Value:SpecificDataType[...]

It seems that the listener exponentiates somehow... What am I doing wrong? Do I miss anything regarding configuration?

Code:

public static void main(String[] args) {
    Member member = Member.getInstance();

    try {
        Cache<Long, SpecificDataType> cache = member.getCache("SpecificDataTypeCache", SpecificDataType.class);
        System.out.println(cache);
    } catch (Exception e) {
        e.printStackTrace();

    }

    // shutdown loop
    boolean shutdown = false;
    while (!shutdown) {
        if (new Scanner(System.in).nextLine().equals("shutdown")) {
            shutdown = true;
        }
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // cache.clear();
    member.shutdown();

}

public <T> Cache<Long, T> getCache(String name, Class<T> clazz) {

    // configure the cache
    MutableConfiguration<Long, T> config = new MutableConfiguration<Long, T>();
    config.setStoreByValue(true).setTypes(Long.class, clazz)
            .setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(TEN_SEC))
            .setStatisticsEnabled(false);

    // create / get cache
    Cache<Long, T> cache = cacheManager.getCache(name, Long.class, clazz);
    if (cache == null) {
        System.out.println("create cache");
        cache = cacheManager.createCache(name, config);
        // create the EntryListener
        MyCacheEntryListener<Long, T> clientListener = new MyCacheEntryListener<Long, T>();

        // using out listener, lets create a configuration
        CacheEntryListenerConfiguration<Long, T> conf = new MutableCacheEntryListenerConfiguration<Long, T>(
                FactoryBuilder.factoryOf(clientListener), null, false, true);

        // register to cache
        cache.registerCacheEntryListener(conf);
    } else {
        System.out.println("get cache");
    }

    return cache;
}

The listener is as simple as it can be:

public class MyCacheEntryListener<K, V> implements CacheEntryCreatedListener<K, V>,
    CacheEntryUpdatedListener<K, V>, CacheEntryExpiredListener<K, V>,
    CacheEntryRemovedListener<K, V>, Serializable {


@Override
public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents)
        throws CacheEntryListenerException {
    System.out.println("CREATE EVENT RECEIVED: ");
    System.out.println(printEvent(cacheEntryEvents));
}


@Override
public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents)
        throws CacheEntryListenerException {
    System.out.println("EXPIRE EVENT RECEIVED: ");
    System.out.println(printEvent(cacheEntryEvents));
}


@Override
public void onRemoved(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents)
        throws CacheEntryListenerException {
    System.out.println("REMOVE EVENT RECEIVED: ");
    System.out.println(printEvent(cacheEntryEvents));
}


@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents)
        throws CacheEntryListenerException {
    System.out.println("UPDATE EVENT RECEIVED: ");
    System.out.println(printEvent(cacheEntryEvents));
}


private String printEvent(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents) {
    StringBuilder sb = new StringBuilder();
    final Iterator<CacheEntryEvent<? extends K, ? extends V>> iterator = cacheEntryEvents
            .iterator();
    while (iterator.hasNext()) {
        final CacheEntryEvent<? extends K, ? extends V> next = iterator.next();
        sb.append("Key: ");
        sb.append(next.getKey());
        sb.append(", Value:");
        sb.append(next.getValue());
        sb.append("\n");
    }
    return sb.toString();
}

}

Any help would be appreciated!

Upvotes: 1

Views: 644

Answers (1)

Serkan &#214;zal
Serkan &#214;zal

Reputation: 86

I have tried your sample with 4 instance and 3 backup count and listener is notified only once as expected.

  • Are you sure that you don't register same listener multiple times?
  • Can you print the MyCacheEntryListener itself also after CREATE EVENT RECEIVED message?
  • Which Hazelcast version do you use?

Regards.

Upvotes: 3

Related Questions