Reputation: 95
I'm doing some tests on Cassandra, to see if we could use it for a scalable key-value store which supports optimistic concurrency.
Since a key-value store only needs a single table and each item is accessed by key, it seems that lightweight transactions could easily provide the technical foundation for our problem.
However, when running a test which does a number of concurrent updates (and retries as long as concurrency is detected), we see that we lose writes.
The test creates a table:
CREATE TABLE objects (key text, version int, PRIMARY KEY(key));
And inserts a number of keys using:
INSERT INTO objects (key, version) VALUES (?, 0) IF NOT EXISTS;
The version of these items is then incremented a number of times using a CAS operation:
-- client retrieves the current version
SELECT version FROM objects WHERE key = ?;
-- and updates the item using the retrieved version as version check
UPDATE objects SET version = ? WHERE key = ? IF version = ?;
The client code actually looks like this for the update:
private async Task<bool> CompareAndSet(string key, int currrentCount, PreparedStatement updateStatement)
{
// increment the version
IStatement statement = updateStatement.Bind(currrentCount + 1, key, currrentCount);
// execute the statement
RowSet result = await Session.ExecuteAsync(statement);
// check the result
Row row = result.GetRows().SingleOrDefault();
if (row == null)
throw new Exception("No row in update result.");
// check if the CAS operation was applied or not
return row.GetValue<bool>("[applied]");
}
As you can see, the CAS operation could not have been applied because of concurrency. So, this operation is retried until it succeeds. Write timeout exceptions are also handled. The rationale behind handling the write timeout exceptions is explained here.
private async Task Update(string key, PreparedStatement selectStatement, PreparedStatement updateStatement)
{
bool done = false;
// try update (increase version) until it succeeds
while (!done)
{
// get current version
TestItem item = null;
while (item == null)
item = await GetItem(key, selectStatement);
try
{
// update version using lightweight transaction
done = await CompareAndSet(key, item.Version, updateStatement);
// lightweight transaction (CAS) failed, because compare failed --> simply not updated
if (!done)
Interlocked.Increment(ref abortedUpdates);
}
catch (WriteTimeoutException wte)
{
// partial write timeout (some have been updated, so all must be eventually updated, because it is a CAS operation)
if (wte.ReceivedAcknowledgements > 0)
{
Interlocked.Increment(ref partialWriteTimeouts);
done = true;
}
else
// complete write timeout --> unsure about this one...
Interlocked.Increment(ref totalWriteTimeouts);
}
}
}
Here's the output for a test which uses 100 items and updates each item 10 times:
Running test with 100 items and 10 updates per item.
Number of updates: 1000
Number of aborted updates due to concurrency: 3485
Number of total write timeouts: 18
Number of partial write timeouts: 162
LOST WRITES: 94 (or 9,40%)
Results:
Updates | Item count
10 | 35
9 | 43
8 | 17
7 | 3
6 | 2
Xunit.Sdk.EqualExceptionAssert.Equal() Failure
Expected: 0
Actual: 94
As you can see, this is a highly concurrent test (see the number of aborted operations where the update has to be retried). But, the bad news is that we are losing writes. The client thinks 1000 updates should have been performed, but 94 writes were lost in this case.
The number of lost writes is in the order of magnitude of the number of write timeouts. So, they seem to be linked. The question is:
Upvotes: 2
Views: 2227
Reputation: 5513
The WriteTimeoutException indicates that Cassandra was not able to perform the operation in time. With your test you put Cassandra under heavy load and any operation may fail with a timeout exception. So what you need to do is redo your operation and recover from the problem by repeated attempt. It is similar to an SQLTimeoutException. You need to defend against that also.
Upvotes: 3