ethrbunny
ethrbunny

Reputation: 10469

Cassandra - using PreparedStatement with ListenableFuture

Im attempting to make async writes to a Cassandra cluster using ListenableFuture as follows:

private static Cluster cluster = null;
private ListeningExecutorService executorService;
private PreparedStatement preparedStatement;
private Session session = null;

... 

executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE));
...

public void writeValue(Tuple tuple) {
    ListenableFuture<String> future = executorService.submit(new Callable<String>() {
       @Override
       public String call() throws Exception {
           if(session == null) {
               session = getCluster().connect("dbname");
               preparedStatement = session.prepare(queryString);
           }

           try {
               BoundStatement boundStatement = preparedStatement.bind(tuple values);
               session.execute(boundStatement);
           } catch(Exception exception) {
                // handle exception
           }

           return null;
       }
    });

If I set POOL_SIZE to 1 everything works.
If I set POOL_SIZE to > 1 I get errors as follows:

Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Tried to execute unknown prepared query : 0x75c5b41b9f07afa5384a69790503f963. You may have used a PreparedStatement that was created with another Cluster instance.

So I session and preparedStatement into local vars. Then I get warnings about Re-preparing already prepared query ... plus it's creating a new session every time.

I want to reuse as much as possible. What am I doing wrong and what are my options?

Would it help to make this class static?

Upvotes: 1

Views: 3628

Answers (2)

Olivier Michallat
Olivier Michallat

Reputation: 2312

You might also want to use the driver's asynchronous API. Instead of calling execute (which will block your thread for the duration of the query), call executeAsync and register a callback on the resulting future to process the result.

If that callback is expensive and you don't want to block the driver's internal I/O thread, then you can provide your own executor:

ListenableFuture<ResultSet> future = session.executeAsync(statement);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
    public void onSuccess(ResultSet rs) { ... }
    public void onFailure(Throwable t) { ... }
  }, 
  executorService);

This page in the documentation has a few tips on async programming.

Upvotes: 2

Sotirios Delimanolis
Sotirios Delimanolis

Reputation: 280132

You have all sorts of race conditions here and execution isn't thread safe.

Each of Cluster, Session, and PreparedStatement are meant to be application scoped singletons, ie. you only need one (one for each query for PreparedStatement).

However, you are recreating a Session and potentially preparing PreparedStatement multiple times.

Don't. Initialize your Session once, in a constructor or some location that only runs once and prepare your statements at the same time. Then use the Session and PreparedStatement where appropriate.


Using a single threaded executor, everything runs as if it was synchronous. When you add more threads, many of them may call

session.prepare(queryString);

at the same time. Or the PreparedStatement you use here

BoundStatement boundStatement = preparedStatement.bind(tuple values);
session.execute(boundStatement);

might be different from the one you initialized

preparedStatement = session.prepare(queryString);

even within the same thread of execution. Or you might be attempting to execute the PreparedStatement with a different Session than the one used to initialize it.


Here are some things you should be doing when using CQL drivers.

  1. Is a prepared statement bound on one session or is it useable on another session?

    A prepared statement is derived from a particular session instance. So when you prepare a statement and it is sent over to the server, it is sent to the cluster with which this session instance is associated with.

The javadoc of Session states

Session instances are thread-safe and usually a single instance is enough per application.

Upvotes: 5

Related Questions