john
john

Reputation: 11669

Efficient way to write asynchronously into cassandra using datastax java driver?

I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM consistency.

  public void save(final String process, final int clientid, final long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, Executors.newFixedThreadPool(10));
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

And below is my CacheStatement class:

public class CacheStatement {
  private static final Map<String, PreparedStatement> cache =
      new ConcurrentHashMap<>();

  private static class Holder {
    private static final CacheStatement INSTANCE = new CacheStatement();
  }

  public static CacheStatement getInstance() {
    return Holder.INSTANCE;
  }

  private CacheStatement() {}

  public BoundStatement getStatement(String cql) {
    Session session = CassUtils.getInstance().getSession();
    PreparedStatement ps = cache.get(cql);
    // no statement cached, create one and cache it now.
    if (ps == null) {
      synchronized (this) {
        ps = cache.get(cql);
        if (ps == null) {
          cache.put(cql, session.prepare(cql));
        }
      }
    }
    return ps.bind();
  }
}

My above save method will be called from multiple threads and I think BoundStatement is not thread safe. Btw StatementCache class is thread safe as shown above.

Below is my connection setting to connect to cassandra using datastax java driver:

Builder builder = Cluster.builder();
    cluster =
        builder
            .addContactPoints(servers.toArray(new String[servers.size()]))
            .withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE))
            .withPoolingOptions(poolingOptions)
            .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
            .withLoadBalancingPolicy(
                DCAwareRoundRobinPolicy
                    .builder()
                    .withLocalDc(
                        !TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation()
                            .get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build())
            .withCredentials(username, password).build();

Upvotes: 2

Views: 2214

Answers (1)

xmas79
xmas79

Reputation: 5180

I think what you're doing is fine. You could optimize a bit further by preparing all the statements at application startup, so you have everything already cached, so you don't get any performance hit for preparing statement when "saving", and you don't lock anything in your workflow.

BoundStatement is not threadsafe, but PreparedStatement yes, and you are returning a new BoundStatement every time you call your getStatement. Indeed, the .bind() function of the PreparedStatement is actually a shortcut for new BoundStatement(ps).bind(). And you are not accessing the same BoundStatement from multiple thread. So your code is fine.

For thread pool, instead, you are actually creating a new thread pool on each addCallback function. This is a waste of resources. I don't use this callback method and I prefer managing plain FutureResultSet by myself, but I saw examples on datastax documentation that use MoreExecutors.sameThreadExecutor() instead of MoreExecutors.directExecutor().

Upvotes: 1

Related Questions