Reputation: 601
A Lambda function gets triggered by SQS messages. the reserved concurrency is set to the maximum which means I can have concurrent Lambda execution. each Lambda will read the SQS message and needs to update a Dynamodb table that holds the sum of message lengths. it's a numeric value that increases. Although I have implemented the optimistic locking, I still see the final value doesn't match with the actual correct summation. any thoughts?
here is the code that does the update:
public async Task Update(T item)
{
using (IDynamoDBContext dbContext = _dataContextFactory.Create())
{
T savedItem = await dbContext.LoadAsync(item);
if (savedItem == null)
{
throw new AmazonDynamoDBException("DynamoService.Update: The item does not exist in the Table");
}
await dbContext.SaveAsync(item);
}
}
Upvotes: 0
Views: 1944
Reputation: 8127
Best to use DynamoDB streams here, and batch writes. Otherwise you will unavoidably have transaction conflicts, probably sitting in some logs somewhere are a bunch of errors. You can also see this cloudwatch metric for your table: TransactionConflict
.
DynamoDB Streams
To perform aggregation, you will need to have a table which has a stream enabled on it. Set the MaximumBatchingWindowInSeconds
& BatchSize
to values which suit your requirements. That is say you need the able to be accurate within 10 seconds, you would set MaximumBatchingWindowInSeconds
to no more than 10. And you might not want to have more than 100 items waiting to be aggregated so set BatchSize=100
. You will create a Lambda function which will process the items coming into your table in the form of:
"TransactItems": [{
"Put": {
"TableName": "protect-your-table",
"Item": {
"id": "123",
"length": 4,
....
You would then iterate over this and sum up the length
attribute, and do an update ADD
statement to a summation in another table, which holds calculated statistics based on the stream. Note you may receive duplicate messages which may cause you errors. You could handle this in Dynamo by making sure you don't write an item if it exists already, or use message deduplication id.
Batching
Make sure you are not processing many many tiny messages one at a time, but instead are batching them together say in your Lambda function which reads form SQS that it can read up to 100 messages at a time and do a batch write. Also set a low concurrency limit on it, so that messages can bank up a little over a couple of seconds.
The reason you want to do this is that you can't actually increment a value in DynamoDB many times a second, it will give you errors and actually slow your processing. You'll find your system as a whole will be performing at a fraction of the cost, be more accurate, and the real time accuracy should be close enough to what you need.
Upvotes: 2