Reputation: 10469
Im attempting to make async writes to a Cassandra cluster using ListenableFuture as follows:
private static Cluster cluster = null;
private ListeningExecutorService executorService;
private PreparedStatement preparedStatement;
private Session session = null;
...
executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE));
...
public void writeValue(Tuple tuple) {
ListenableFuture<String> future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
if(session == null) {
session = getCluster().connect("dbname");
preparedStatement = session.prepare(queryString);
}
try {
BoundStatement boundStatement = preparedStatement.bind(tuple values);
session.execute(boundStatement);
} catch(Exception exception) {
// handle exception
}
return null;
}
});
If I set POOL_SIZE to 1 everything works.
If I set POOL_SIZE to > 1 I get errors as follows:
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Tried to execute unknown prepared query : 0x75c5b41b9f07afa5384a69790503f963. You may have used a PreparedStatement that was created with another Cluster instance.
So I session
and preparedStatement
into local vars. Then I get warnings about Re-preparing already prepared query ...
plus it's creating a new session every time.
I want to reuse as much as possible. What am I doing wrong and what are my options?
Would it help to make this class static?
Upvotes: 1
Views: 3628
Reputation: 2312
You might also want to use the driver's asynchronous API. Instead of calling execute
(which will block your thread for the duration of the query), call executeAsync
and register a callback on the resulting future to process the result.
If that callback is expensive and you don't want to block the driver's internal I/O thread, then you can provide your own executor:
ListenableFuture<ResultSet> future = session.executeAsync(statement);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
public void onSuccess(ResultSet rs) { ... }
public void onFailure(Throwable t) { ... }
},
executorService);
This page in the documentation has a few tips on async programming.
Upvotes: 2
Reputation: 280132
You have all sorts of race conditions here and execution isn't thread safe.
Each of Cluster
, Session
, and PreparedStatement
are meant to be application scoped singletons, ie. you only need one (one for each query for PreparedStatement
).
However, you are recreating a Session
and potentially preparing PreparedStatement
multiple times.
Don't. Initialize your Session
once, in a constructor or some location that only runs once and prepare your statements at the same time. Then use the Session
and PreparedStatement
where appropriate.
Using a single threaded executor, everything runs as if it was synchronous. When you add more threads, many of them may call
session.prepare(queryString);
at the same time. Or the PreparedStatement
you use here
BoundStatement boundStatement = preparedStatement.bind(tuple values);
session.execute(boundStatement);
might be different from the one you initialized
preparedStatement = session.prepare(queryString);
even within the same thread of execution. Or you might be attempting to execute the PreparedStatement
with a different Session
than the one used to initialize it.
Here are some things you should be doing when using CQL drivers.
Is a prepared statement bound on one session or is it useable on another session?
A prepared statement is derived from a particular session instance. So when you prepare a statement and it is sent over to the server, it is sent to the cluster with which this session instance is associated with.
The javadoc of Session
states
Session instances are thread-safe and usually a single instance is enough per application.
Upvotes: 5