Dibakar Sen
Dibakar Sen

Reputation: 41

Ignite thin client does not reconnect if all nodes in a cluster go down & come back up

I am evaluating Ignite as a caching layer for our architecture. When trying out Ignite Java thin client for the use case mentioned below, I do not find any pointer(s) in ignite doc/any forum as to how this is being tackled by the ignite community. Any pointer(s) will be helpful before I go ahead and use my custom solution.

Use case: All nodes in an ignite cluster go down and come back up. Basically, thin client loses connection to all cluster nodes for some time.

What I was expecting I am using continuous query and register for disconnect events. Hence, I was expecting some disconnect event which I never got. Reference code below.

    public static QueryCursor<Cache.Entry<String, String>> subscribeForDataUpdates(ClientCache<String, String> entityCache,
                                                                                   AtomicLong totalUpdatesTracker) {
        ClientDisconnectListener disconnectListener = reason ->
                System.out.printf("Client: %s received disconnect event with reason:%s %n",
                        getClientIpAddr(),
                        reason.getMessage());
        ContinuousQuery<String, String> continuousQuery = new ContinuousQuery<>();
        continuousQuery.setLocalListener(new CacheUpdateListener(entityCache.getName(), totalUpdatesTracker));
        QueryCursor<Cache.Entry<String, String>> queryCursor = entityCache.query(continuousQuery, disconnectListener);
        System.out.printf("Client: %s - subscribed for change notification(s) for entity cache: %s %n",
                getClientIpAddr(),
                entityCache.getName());
        return queryCursor;
    }

What I ended up doing

Writing my own checker to re-initialize the thin client connection to ignite cluster and re-subscribing for continuous query updates.

import io.vavr.control.Try;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.client.IgniteClient;

import javax.cache.Cache;
import javax.inject.Inject;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.cisco.ignite.consumer.CacheChangeSubscriber.subscribeForDataUpdates;
import static com.cisco.ignite.consumer.Utils.addShutDownHookToCloseCacheUpdates;
import static com.cisco.ignite.consumer.Utils.getClientIpAddr;

public class ClusterConnectionChecker implements Runnable {
    private static final List<QueryCursor<Cache.Entry<String, String>>> querySubscriptions = new ArrayList<>();
    @Inject
    private CacheChangeSubscriber cacheChangeSubscriber;
    private IgniteClient thinClientInstance;
    private final long secondsDelayBetweenChecks;
    private final List<String> cacheNames;
    private final AtomicLong totalUpdatesTracker;
    private boolean needsReSubscription = false;


    public ClusterConnectionChecker(IgniteClient client, long delayBetweenChecks,
                                    List<String> cacheNames, AtomicLong totalUpdatesTracker) {
        this.thinClientInstance = client;
        this.secondsDelayBetweenChecks = delayBetweenChecks;
        this.cacheNames = cacheNames;
        this.totalUpdatesTracker = totalUpdatesTracker;
    }

    @Override
    public void run() {
        while(!Thread.interrupted()) {
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(secondsDelayBetweenChecks));
                boolean isClusterConnectionActive = isConnectionToClusterActive();
                if (!isClusterConnectionActive) {
                    needsReSubscription = true;
                    System.out.printf("Time: %s | Connection to ignite cluster is not active !!! %n",
                            LocalDateTime.now());
                    reInitializeThinClient();
                    reSubscribeForUpdates();
                } else {
                    // we only need to conditionally re-subscribe
                    if (needsReSubscription) {
                        reSubscribeForUpdates();
                    }
                }
            } catch (InterruptedException ie) {
                // do nothing - just reset the interrupt flag.
                Thread.currentThread().interrupt();
            }
        }
    }

    private boolean isConnectionToClusterActive() {
        return Try.of(() -> {
            return thinClientInstance.cluster().state().active();
        }).recover(ex -> {
            return false;
        }).getOrElse(false);
    }

    private void reInitializeThinClient() {
        Try.of(() -> {
            thinClientInstance = cacheChangeSubscriber.createThinClientInstance();
            if (thinClientInstance.cluster().state().active()) {
                System.out.printf("Client: %s | Thin client instance was re-initialized since it was not active %n",
                        getClientIpAddr());
            }
            return thinClientInstance;
        }).onFailure(th -> System.out.printf("Client: %s | Failed to re-initialize ignite cluster connection. " +
                "Will re-try after:%d seconds %n", getClientIpAddr(),secondsDelayBetweenChecks));
    }

    private void reSubscribeForUpdates() {
        if (isConnectionToClusterActive()) {
            System.out.printf("Client: %s | Re-subscribing for cache updates after cluster connection re-init... %n",
                    getClientIpAddr());
            // re-set the counter to 0 since we are re-subscribing fresh
            totalUpdatesTracker.set(0);
            cacheNames.forEach(name -> querySubscriptions.add(subscribeForDataUpdates(
                    thinClientInstance.getOrCreateCache(name),
                    totalUpdatesTracker)));
            addShutDownHookToCloseCacheUpdates(querySubscriptions, thinClientInstance);
            needsReSubscription = false;
        }
    }
}

Upvotes: 1

Views: 603

Answers (0)

Related Questions