Jochen
Jochen

Reputation: 95

How can we avoid losing writes when using lightweight transactions (CAS) in Cassandra?

I'm doing some tests on Cassandra, to see if we could use it for a scalable key-value store which supports optimistic concurrency.

Since a key-value store only needs a single table and each item is accessed by key, it seems that lightweight transactions could easily provide the technical foundation for our problem.

However, when running a test which does a number of concurrent updates (and retries as long as concurrency is detected), we see that we lose writes.

The test creates a table:

CREATE TABLE objects (key text, version int, PRIMARY KEY(key));

And inserts a number of keys using:

INSERT INTO objects (key, version) VALUES (?, 0) IF NOT EXISTS;

The version of these items is then incremented a number of times using a CAS operation:

-- client retrieves the current version
SELECT version FROM objects WHERE key = ?;

-- and updates the item using the retrieved version as version check
UPDATE objects SET version = ? WHERE key = ? IF version = ?;

The client code actually looks like this for the update:

private async Task<bool> CompareAndSet(string key, int currrentCount, PreparedStatement updateStatement)
{
    // increment the version
    IStatement statement = updateStatement.Bind(currrentCount + 1, key, currrentCount);

    // execute the statement
    RowSet result = await Session.ExecuteAsync(statement);

    // check the result
    Row row = result.GetRows().SingleOrDefault();

    if (row == null)
        throw new Exception("No row in update result.");

    // check if the CAS operation was applied or not
    return row.GetValue<bool>("[applied]");
}

As you can see, the CAS operation could not have been applied because of concurrency. So, this operation is retried until it succeeds. Write timeout exceptions are also handled. The rationale behind handling the write timeout exceptions is explained here.

private async Task Update(string key, PreparedStatement selectStatement, PreparedStatement updateStatement)
{
    bool done = false;

    // try update (increase version) until it succeeds
    while (!done)
    {
        // get current version                
        TestItem item = null;

        while (item == null)
            item = await GetItem(key, selectStatement);

        try
        {
            // update version using lightweight transaction 
            done = await CompareAndSet(key, item.Version, updateStatement);

            // lightweight transaction (CAS) failed, because compare failed --> simply not updated
            if (!done)
                Interlocked.Increment(ref abortedUpdates);
        }
        catch (WriteTimeoutException wte)
        {
            // partial write timeout (some have been updated, so all must be eventually updated, because it is a CAS operation)
            if (wte.ReceivedAcknowledgements > 0)
            {
                Interlocked.Increment(ref partialWriteTimeouts);
                done = true;
            }
            else
                // complete write timeout --> unsure about this one...
                Interlocked.Increment(ref totalWriteTimeouts);
        }
    }
}

Here's the output for a test which uses 100 items and updates each item 10 times:

Running test with 100 items and 10 updates per item.

Number of updates: 1000
Number of aborted updates due to concurrency: 3485
Number of total write timeouts: 18
Number of partial write timeouts: 162

LOST WRITES: 94 (or 9,40%)

Results: 

Updates | Item count
     10 |         35
      9 |         43
      8 |         17
      7 |          3
      6 |          2

Xunit.Sdk.EqualExceptionAssert.Equal() Failure
Expected: 0
Actual:   94

As you can see, this is a highly concurrent test (see the number of aborted operations where the update has to be retried). But, the bad news is that we are losing writes. The client thinks 1000 updates should have been performed, but 94 writes were lost in this case.

The number of lost writes is in the order of magnitude of the number of write timeouts. So, they seem to be linked. The question is:

Upvotes: 2

Views: 2227

Answers (1)

Martin Kersten
Martin Kersten

Reputation: 5513

The WriteTimeoutException indicates that Cassandra was not able to perform the operation in time. With your test you put Cassandra under heavy load and any operation may fail with a timeout exception. So what you need to do is redo your operation and recover from the problem by repeated attempt. It is similar to an SQLTimeoutException. You need to defend against that also.

Upvotes: 3

Related Questions