Mark J Miller
Mark J Miller

Reputation: 4871

DynamoDB javascript SDK batchWriteItem doesn't complete unless I increase write capacity

I'm running a series of unit tests (node.js 4.x, aws-sdk, mocha) which load data into a table before each test then clears the table after the test.

I have two tests that are failing because of a ConditionExpression which triggers a ConditionCheckFailedException. But if I increase the read/write capacity they tests pass.

It's my understanding that the SDK handles throttling exceptions and retries them for you so why wouldn't my tests just run slower and pass? Instead it seems as though the tests fail to complete the scan -> batchWriteItem process and so there are records still left in the table when a new tests starts.

I'm told by team members that they've seen similar problems and they just increased the throughput to fix the problem. This doesn't sit right with me. Either I'm doing something wrong and there's a race condition with my tests or there should be a pattern I can implement to make sure that my operations complete when being throttled? I should be able to use throttling metrics to inform when I need to increase throughput but I should still be able to keep retrying until I run out of memory.

Has anyone else run into this and what have you done to handle the problem?

Upvotes: 0

Views: 2047

Answers (2)

Jimmy_m
Jimmy_m

Reputation: 1598

Also take a look at the amount of memory provisioned for the Lambda. Might be too low and hitting the max leads to unpredictable results IMX.

Upvotes: 0

Mark J Miller
Mark J Miller

Reputation: 4871

After some debugging I noticed the UnprocessedItems response element. After looking up UnprocessedItems in the docs I realize I should have read more closely. The code below will run a retry loop with a delay (exponential back-off):

var clearEventTable = function (tableName, client, cleared) {
  var exclusiveStartKey = null;
  var retryCount = 0;

  var read = function(query, callback) {
    client.scan(query, function (err, page) {
      if(err) {
        console.log(err);
        return callback(err);
      }

      retryCount = 0;
      exclusiveStartKey = page.LastEvaluatedKey || null;
      if(page.Count == 0) {
        return callback(null, {});
      }

      if(page.Count < 25 && exclusiveStartKey) {
        console.log("read capacity limit reached: " + JSON.stringify(page, null, 2));
      }

      var keys = _.map(page.Items, function(n) {
        return { DeleteRequest: { Key: n } };
      });

      var batch = {
        RequestItems: {},
        ReturnConsumedCapacity: "INDEXES",
        ReturnItemCollectionMetrics: "SIZE"
      };

      batch.RequestItems[tableName] = keys;

      callback(null, batch);
    });
  };

  var write = function(batch, callback) {
    if(batch && batch.RequestItems){
      client.batchWriteItem(batch, function(err, result) {
        if(err) {
          console.log(err);
          return callback(err);
        }

        if(Object.keys(result.UnprocessedItems).length !== 0) {
          console.log("Retry batchWriteItem: " + JSON.stringify(result, null, 2));
          retryCount++;
          var retry = {
            RequestItems: result.UnprocessedItems,
            ReturnConsumedCapacity: "INDEXES",
            ReturnItemCollectionMetrics: "SIZE"
          };
          // retry with exponential backoff
          var delay = retryCount > 0 ? (50 * Math.pow(2, retryCount - 1)) : 0;
          setTimeout(write(retry, callback), delay);
          return;
        }

        callback(null, result);
      });
    } else {
      callback(null);
    }
  };

  var params = {
    TableName: tableName,
    ProjectionExpression: "aggregateId,id",
    Limit: 25, // max 25 per batchWriteItem 
    ConsistentRead: false,
    ReturnConsumedCapacity: "TOTAL"
  };

  async.doWhilst(function (next) {
    // retrieve entities
    if (exclusiveStartKey)
      params.ExclusiveStartKey = exclusiveStartKey;

    async.compose(write, read)(params, function (err, result) {
      if (err) next(err);
      else next(null, result);
    });
  }, function () {
    // test if we need to load more
    return exclusiveStartKey !== null;
  }, function (err, r) {
    // return results
    if (err) {
      console.log(err);
      return cleared(err);
    }
    return cleared(null);;
  });
};

Upvotes: 3

Related Questions