Reputation: 13321
I have the working code that reads CSV file from S3, groups every 25 rows in a BatchWriteItem
DynamoDB request, and sends it. The BatchWrite
would often return success with UnprocessedItems
which contains partial items (not all 25). Subsequent resubmit may also fail (partial or complete). I wanted to implement exponential backoff when sending subsequent requests, but all the library I have found assuming the tasks are the same. In my case, the items may or may not be the same as the ones in the previous requests.
I am not very familiar with Node.js. Is there any library/way to implement a re-tried tasks with (different) context?
I am using AWS Lambda, so cannot use global variables.
Helper function writing to DDB with 1 retry:
// batchwrite to DDB
function batchWriteDDB(params) {
dynamodb.batchWriteItem(params, function(err, data) {
if (err) {
console.error("Batchwrite failed: " + err, err.stack);
} else {
var unprocessed = data.UnprocessedItems;
if (Object.keys(unprocessed).length === 0) {
console.log("Processed all items.");
} else {
// some unprocessed items, do it again
console.warn("Batchwrite did not to complete: " + util.inspect(unprocessed, { showHidden: false, depth: null }));
console.log("Retry btachwriting...");
var params2 = {};
params2["RequestItems"] = data.UnprocessedItems;
dynamodb.batchWriteItem(params2, function(error, data2) {
if (err) {
console.error("Retry failed: " + err, err.stack);
} else {
var unprocessed2 = data2.UnprocessedItems;
if (Object.keys(unprocessed2).length === 0) {
console.log("Retry processed all items.");
} else {
console.error("Failed AGAIN to complete: " + util.inspect(unprocessed2, { showHidden: false, depth: null }));
}
}
});
}
}
});
}
Upvotes: 11
Views: 7317
Reputation: 1368
Bellow is a recursive way of handling unprocessed items writen to Dynamodb.
var batchWrite = function (items, table, callback) {
var params = { RequestItems: {} };
logger.info('batchWrite initial length of items: ' + items.length);
table = table || 'Merchants';
params['RequestItems'][table] = [];
var attempt = 0;
var batchCount = 0;
while (items.length > 0) {
// Pull off up to 25 items from the list
for (var i = params['RequestItems'][table].length; i < 25; i++) {
// Nothing else to add to the batch if the input list is empty
if (items.length === 0) {
break;
}
// Take a URL from the list and add a new PutRequest to the list of requests
// targeted at the Image table
item = items.pop();
//logger.info('batchWrite length of items after pop: '+items.length);
params['RequestItems'][table].push(item);
}
// Kick off this batch of requests
logger.info("Calling BatchWriteItem with a new batch of "
+ params['RequestItems'][table].length + " items");
logger.info("batchCount = " + batchCount + " set to execute in " + (10 * batchCount) + " seconds");
logger.info("form of params sent to batchWrite: ");
let dynamo = new AWS.DynamoDB({ apiVersion: '2012-08-10' });
dynamo.batchWriteItem(params, doBatchWriteItem);
// Initialize a new blank params variable
params['RequestItems'][table] = [];
batchCount++;
}
//A callback that repeatedly calls BatchWriteItem until all of the writes have completed
function doBatchWriteItem(err, data) {
batchCount--;
if (err) {
logger.info(err); // an error occurred
if (batchCount === 0) {
callback(err, data);
}
} else {
console.dir(data);
if (('UnprocessedItems' in data) && (table in data.UnprocessedItems)) {
// More data. Call again with the unprocessed items.
var params = {
RequestItems: data.UnprocessedItems
};
attempt++;
batchCount++;
logger.info("Calling BatchWriteItem again to retry "
+ params['RequestItems'][table].length + "UnprocessedItems in " + (10 * attempt) + " seconds");
logger.info("batchCount increased to " + batchCount);
setTimeout(function () {
let dynamo = new AWS.DynamoDB({ apiVersion: '2012-08-10' });
dynamo.batchWriteItem(params, doBatchWriteItem);
}, 10000 * attempt);
} else {
logger.info("BatchWriteItem processed all items in the batch, batchCount = " + batchCount);
if (batchCount === 0) {
logger.info("batchWrite processed all batches");
callback(null, data);
}
}
}
}
}
Call the batchWrite function with the collection and parameters.
batchWrite(collection, 'your-table-name', (err, data) => {
if (err) {
logger.info('error ');
}
logger.info('success ');
});
Upvotes: 3
Reputation: 39226
AWS SDK supports the exponential back-off and retry mechanism. You can config that.
Set the base retry delay for all services to 300 ms
AWS.config.update({retryDelayOptions: {base: 300}});
// Delays with maxRetries = 3: 300, 600, 1200
Set a custom backoff function to provide delay values on retries
AWS.config.update({retryDelayOptions: {customBackoff: function(retryCount) {
// returns delay in ms
}}});
Specifically configuring for AWS DynamoDB service:-
var dynamodb = (new AWS.DynamoDB({maxRetries: 5}))
Specifically configuring for AWS DynamoDB service:-
Config:-
var dynamodb = new AWS.DynamoDB({maxRetries: 5, retryDelayOptions: {base: 300} });
Upvotes: 13