Reputation: 41
I am evaluating Ignite as a caching layer for our architecture. When trying out Ignite Java thin client for the use case mentioned below, I do not find any pointer(s) in ignite doc/any forum as to how this is being tackled by the ignite community. Any pointer(s) will be helpful before I go ahead and use my custom solution.
Use case: All nodes in an ignite cluster go down and come back up. Basically, thin client loses connection to all cluster nodes for some time.
What I was expecting I am using continuous query and register for disconnect events. Hence, I was expecting some disconnect event which I never got. Reference code below.
public static QueryCursor<Cache.Entry<String, String>> subscribeForDataUpdates(ClientCache<String, String> entityCache,
AtomicLong totalUpdatesTracker) {
ClientDisconnectListener disconnectListener = reason ->
System.out.printf("Client: %s received disconnect event with reason:%s %n",
getClientIpAddr(),
reason.getMessage());
ContinuousQuery<String, String> continuousQuery = new ContinuousQuery<>();
continuousQuery.setLocalListener(new CacheUpdateListener(entityCache.getName(), totalUpdatesTracker));
QueryCursor<Cache.Entry<String, String>> queryCursor = entityCache.query(continuousQuery, disconnectListener);
System.out.printf("Client: %s - subscribed for change notification(s) for entity cache: %s %n",
getClientIpAddr(),
entityCache.getName());
return queryCursor;
}
What I ended up doing
Writing my own checker to re-initialize the thin client connection to ignite cluster and re-subscribing for continuous query updates.
import io.vavr.control.Try;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.client.IgniteClient;
import javax.cache.Cache;
import javax.inject.Inject;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.cisco.ignite.consumer.CacheChangeSubscriber.subscribeForDataUpdates;
import static com.cisco.ignite.consumer.Utils.addShutDownHookToCloseCacheUpdates;
import static com.cisco.ignite.consumer.Utils.getClientIpAddr;
public class ClusterConnectionChecker implements Runnable {
private static final List<QueryCursor<Cache.Entry<String, String>>> querySubscriptions = new ArrayList<>();
@Inject
private CacheChangeSubscriber cacheChangeSubscriber;
private IgniteClient thinClientInstance;
private final long secondsDelayBetweenChecks;
private final List<String> cacheNames;
private final AtomicLong totalUpdatesTracker;
private boolean needsReSubscription = false;
public ClusterConnectionChecker(IgniteClient client, long delayBetweenChecks,
List<String> cacheNames, AtomicLong totalUpdatesTracker) {
this.thinClientInstance = client;
this.secondsDelayBetweenChecks = delayBetweenChecks;
this.cacheNames = cacheNames;
this.totalUpdatesTracker = totalUpdatesTracker;
}
@Override
public void run() {
while(!Thread.interrupted()) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(secondsDelayBetweenChecks));
boolean isClusterConnectionActive = isConnectionToClusterActive();
if (!isClusterConnectionActive) {
needsReSubscription = true;
System.out.printf("Time: %s | Connection to ignite cluster is not active !!! %n",
LocalDateTime.now());
reInitializeThinClient();
reSubscribeForUpdates();
} else {
// we only need to conditionally re-subscribe
if (needsReSubscription) {
reSubscribeForUpdates();
}
}
} catch (InterruptedException ie) {
// do nothing - just reset the interrupt flag.
Thread.currentThread().interrupt();
}
}
}
private boolean isConnectionToClusterActive() {
return Try.of(() -> {
return thinClientInstance.cluster().state().active();
}).recover(ex -> {
return false;
}).getOrElse(false);
}
private void reInitializeThinClient() {
Try.of(() -> {
thinClientInstance = cacheChangeSubscriber.createThinClientInstance();
if (thinClientInstance.cluster().state().active()) {
System.out.printf("Client: %s | Thin client instance was re-initialized since it was not active %n",
getClientIpAddr());
}
return thinClientInstance;
}).onFailure(th -> System.out.printf("Client: %s | Failed to re-initialize ignite cluster connection. " +
"Will re-try after:%d seconds %n", getClientIpAddr(),secondsDelayBetweenChecks));
}
private void reSubscribeForUpdates() {
if (isConnectionToClusterActive()) {
System.out.printf("Client: %s | Re-subscribing for cache updates after cluster connection re-init... %n",
getClientIpAddr());
// re-set the counter to 0 since we are re-subscribing fresh
totalUpdatesTracker.set(0);
cacheNames.forEach(name -> querySubscriptions.add(subscribeForDataUpdates(
thinClientInstance.getOrCreateCache(name),
totalUpdatesTracker)));
addShutDownHookToCloseCacheUpdates(querySubscriptions, thinClientInstance);
needsReSubscription = false;
}
}
}
Upvotes: 1
Views: 603