Pramit Bhaumik
Pramit Bhaumik

Reputation: 51

Datastax Java Driver Custom Retry Policy

I have written a custom retry policy class where I can pass no of retries driver will perform onWriteTimeout/onUnavilable/onReadTimeout.

public class CustomRetryPolicy implements RetryPolicy {


  private static final Logger LOG = LoggerFactory.getLogger(CustomRetryPolicy.class);

  @VisibleForTesting
  public static final String RETRYING_ON_READ_TIMEOUT =
      "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, "
          + "received responses: {}, data retrieved: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_WRITE_TIMEOUT =
      "[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, "
          + "required acknowledgments: {}, received acknowledgments: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_UNAVAILABLE =
      "[{}] Retrying on unavailable exception on next host (consistency: {}, "
          + "required replica: {}, alive replica: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_ABORTED =
      "[{}] Retrying on aborted request on next host (retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_ERROR =
      "[{}] Retrying on node error on next host (retries: {})";

  private static final String LOG_PREFIX = "DATASTORE-CASSANDRA";

  private final int readAttempts;
  private final int writeAttempts;
  private final int unavailableAttempts;

  public CustomRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) {
    this.readAttempts = readAttempts;
    this.writeAttempts = writeAttempts;
    this.unavailableAttempts = unavailableAttempts;
  }

  @Override
  public RetryDecision onReadTimeout(Request request, ConsistencyLevel cl, int blockFor,
      int received, boolean dataPresent, int retryCount) {


    RetryDecision decision = (retryCount < readAttempts && received >= blockFor && !dataPresent)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_READ_TIMEOUT, LOG_PREFIX, cl, blockFor, received, false, retryCount);
    }

    return decision;
  }



  @Override
  public RetryDecision onWriteTimeout(Request request, ConsistencyLevel cl, WriteType writeType,
      int blockFor, int received, int retryCount) {
    RetryDecision decision = (retryCount < writeAttempts && writeType == DefaultWriteType.BATCH_LOG)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_WRITE_TIMEOUT, LOG_PREFIX, cl, writeType, blockFor, received,
          retryCount);
    }
    return decision;
  }

  @Override
  public RetryDecision onUnavailable(Request request, ConsistencyLevel cl, int required, int alive,
      int retryCount) {
    RetryDecision decision =
        (retryCount < unavailableAttempts) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_UNAVAILABLE, LOG_PREFIX, cl, required, alive, retryCount);
    }

    return decision;
  }

  @Override
  public RetryDecision onRequestAborted(Request request, Throwable error, int retryCount) {
    RetryDecision decision =
        (error instanceof ClosedConnectionException || error instanceof HeartbeatException)
            ? RetryDecision.RETRY_NEXT
            : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_ABORTED, LOG_PREFIX, retryCount, error);
    }

    return decision;
  }

  @Override
  public RetryDecision onErrorResponse(Request request, CoordinatorException error,
      int retryCount) {
    RetryDecision decision =
        (error instanceof ReadFailureException || error instanceof WriteFailureException)
            ? RetryDecision.RETHROW
            : RetryDecision.RETRY_NEXT;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_ERROR, LOG_PREFIX, retryCount, error);
    }

    return decision;
  }

  @Override
  public void close() {

    // Nothing to do

  }



}

I am using datastax java driver 4.6.0. But the problem is I cant pass object of this class with CQLSessionBuilder,which is possible via like

RetryPolicy rc = new CustomRetryPolicy(3, 3, 2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();

in older versions of driver. I have tried with DriverConfigLoader but there is only option to pass the custom class name.

Could you please suggest .

Upvotes: 2

Views: 1995

Answers (1)

Alex Ott
Alex Ott

Reputation: 87174

If you look to implementation of DefaultRetryPolicy, and example of CustomRetryPolicy, you'll see that both are receiving 2 parameters: context of type DriverContext, and string with profile name. And then you should be able to use context to get DriverConfig via getConfig call, and then use getProfile on config to pull configuration values that are required for your custom policy - you can put your own configuration values into configuration file and use it inside your retry policy, something like this:

datastax-java-driver {
  advanced.retry-policy {
    class = DefaultRetryPolicy
  }
  profiles {
    custom-retries {
      advanced.retry-policy {
        class = CustomRetryPolicy
        custom-policy {
           read-attempts = 3
           write-attempts = 2
           ...
        }
      }
    }
  }
}

Upvotes: 3

Related Questions