Reputation: 11669
I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM consistency.
public void save(final String process, final int clientid, final long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, Executors.newFixedThreadPool(10));
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
And below is my CacheStatement
class:
public class CacheStatement {
private static final Map<String, PreparedStatement> cache =
new ConcurrentHashMap<>();
private static class Holder {
private static final CacheStatement INSTANCE = new CacheStatement();
}
public static CacheStatement getInstance() {
return Holder.INSTANCE;
}
private CacheStatement() {}
public BoundStatement getStatement(String cql) {
Session session = CassUtils.getInstance().getSession();
PreparedStatement ps = cache.get(cql);
// no statement cached, create one and cache it now.
if (ps == null) {
synchronized (this) {
ps = cache.get(cql);
if (ps == null) {
cache.put(cql, session.prepare(cql));
}
}
}
return ps.bind();
}
}
My above save
method will be called from multiple threads and I think BoundStatement
is not thread safe. Btw StatementCache
class is thread safe as shown above.
BoundStatement
is not thread safe. Will there be any problem in my above code if I write asynchronously from multiple threads?Executors.newFixedThreadPool(10)
in the addCallback
parameter. Is this ok or there will be any problem? Or should I use MoreExecutors.directExecutor
. What is the difference between these two then? And what is the best way for this?Below is my connection setting to connect to cassandra using datastax java driver:
Builder builder = Cluster.builder();
cluster =
builder
.addContactPoints(servers.toArray(new String[servers.size()]))
.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE))
.withPoolingOptions(poolingOptions)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc(
!TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation()
.get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build())
.withCredentials(username, password).build();
Upvotes: 2
Views: 2214
Reputation: 5180
I think what you're doing is fine. You could optimize a bit further by preparing all the statements at application startup, so you have everything already cached, so you don't get any performance hit for preparing statement when "saving", and you don't lock anything in your workflow.
BoundStatement
is not threadsafe, but PreparedStatement
yes, and you are returning a new BoundStatement
every time you call your getStatement
. Indeed, the .bind()
function of the PreparedStatement
is actually a shortcut for new BoundStatement(ps).bind()
. And you are not accessing the same BoundStatement
from multiple thread. So your code is fine.
For thread pool, instead, you are actually creating a new thread pool on each addCallback
function. This is a waste of resources. I don't use this callback method and I prefer managing plain FutureResultSet
by myself, but I saw examples on datastax documentation that use MoreExecutors.sameThreadExecutor()
instead of MoreExecutors.directExecutor()
.
Upvotes: 1