napster
napster

Reputation: 193

Cassandra Async reads and writes, Best practices

To Set the context, We have 4 tables in cassandra, out of those 4, one is data table remaining are search tables (Lets assumme DATA, SEARCH1, SEARCH2 and SEARCH3 are the tables).

We have an initial load requirement with upto 15k rows in one req for the DATA table and hence to the search tables to keep in sync. We do it in batch inserts with each bacth as 4 queries (one to each table) to keep consistency.

But for every batch we need to read the data. If exists, just update only the DATA table's lastUpdatedDate column, else insert to all the 4 tables.

And below is the code snippet how we are doing:

public List<Items> loadData(List<Items> items) {
    CountDownLatch latch = new CountDownLatch(items.size());
    ForkJoinPool pool = new ForkJoinPool(6);
    pool.submit(() -> items.parallelStream().forEach(item -> {
      BatchStatement batch = prepareBatchForCreateOrUpdate(item);
      batch.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
      ResultSetFuture future = getSession().executeAsync(batch);
      Futures.addCallback(future, new AsyncCallBack(latch), pool);
    }));

    try {
      latch.await();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }

    //TODO Consider what to do with the failed Items, Retry? or remove from the items in the return type
    return items;
}

private BatchStatement prepareBatchForCreateOrUpdate(Item item) {
    BatchStatement batch = new BatchStatement();
    Item existingItem = getExisting(item) //synchronous read
    if (null != data) {
      existingItem.setLastUpdatedDateTime(new Timestamp(System.currentTimeMillis()));
      batch.add(existingItem));
      return batch;
    }

    batch.add(item);
    batch.add(convertItemToSearch1(item));
    batch.add(convertItemToSearch2(item));
    batch.add(convertItemToSearch3(item));

    return batch;
  }

class AsyncCallBack implements FutureCallback<ResultSet> {
    private CountDownLatch latch;

    AsyncCallBack(CountDownLatch latch) {
      this.latch = latch;
    }

    // Cooldown the latch for either success or failure so that the thread that is waiting on latch.await() will know when all the asyncs are completed.
    @Override
    public void onSuccess(ResultSet result) {
      latch.countDown();
    }

    @Override
    public void onFailure(Throwable t) {
      LOGGER.warn("Failed async query execution, Cause:{}:{}", t.getCause(), t.getMessage());
      latch.countDown();
    }
  }

The execution is taking about 1.5 to 2 mins for 15k items considering the network roundtrip b/w application and cassandra cluster(Both reside on same DNS but different pods on kubernetes)

we have ideas to make even the read call getExisting(item) also async, but handling of the failure cases is becoming complex. Is there a better approach for data loads for cassandra(Considering only the Async wites through datastax enterprise java driver).

Upvotes: 1

Views: 3376

Answers (1)

Alex Ott
Alex Ott

Reputation: 87154

First thing - batches in Cassandra are other things than in the relational DBs. And by using them you're putting more load on the cluster.

Regarding the making everything async, I thought about following possibility:

  1. make query to the DB, obtain a Future and add listener to it - that will be executed when query is finished (override the onSuccess);
  2. from that method, you can schedule the execution of the next actions based on the result that is obtained from Cassandra.

One thing that you need to make sure to check, is that you don't issue too much simultaneous requests at the same time. In the version 3 of the protocol, you can have up to 32k in-flight requests per connection, but in your case you may issue up to 60k (4x15k) requests. I'm using following wrapper around Session class to limit the number of in-flight requests.

Upvotes: 2

Related Questions