Reputation: 637
The following code writes items into Dynamodb in batch:
with table.batch_writer() as batch:
for item in chunk:
dynamodb_item = {
'itemId': item['key'],
'time': item['time'],
'value': item['value']
}
batch.put_item( Item = dynamodb_item )
As stated in the following docs, if the batch call fails it returns the unprocessed items: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.batch_write_item
In boto3, how can I get the unprocessed items in the response?
How can I figure out if it's all handled successfully or if the call response has unprocessed items?
Upvotes: 1
Views: 1342
Reputation: 1
Found this question while looking for similar info myself, and thought I'd come back with some extra info for future people.
There isn't a response with unprocessed items like batch_write_item
has.
However, it claims in its documentation that:
In addition, the batch writer will also automatically handle any unprocessed items and resend them as needed."
I had a look at batch_writer's source code in boto3 on github, and under the hood batch_writer
actually takes the UnprocessedItems
from the response to its call of batch_write_item
and then adds them back to its own buffer for retrying. Relevant section of source code:
def _flush(self):
items_to_send = self._items_buffer[: self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount :]
response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send}
)
unprocessed_items = response['UnprocessedItems']
if not unprocessed_items:
unprocessed_items = {}
item_list = unprocessed_items.get(self._table_name, [])
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(item_list)
Upvotes: 0
Reputation: 7404
Use batch_write_item
as you mentioned, instead of put_item
Check this example:
response = await client.batch_write_item(
RequestItems=request_items
)
if len(response['UnprocessedItems']) == 0:
print('Wrote 25 items to dynamo')
else:
await asyncio.sleep(5)
unprocessed_items = response['UnprocessedItems']
# proceed with unprocessed_items
Upvotes: 1