CJ_Spaz
CJ_Spaz

Reputation: 1204

Complete scan of dynamoDb with boto3

My table is around 220mb with 250k records within it. I'm trying to pull all of this data into python. I realize this needs to be a chunked batch process and looped through, but I'm not sure how I can set the batches to start where the previous left off.

Is there some way to filter my scan? From what I read that filtering occurs after loading and the loading stops at 1mb so I wouldn't actually be able to scan in new objects.

Any assistance would be appreciated.

import boto3
dynamodb = boto3.resource('dynamodb',
    aws_session_token = aws_session_token,
    aws_access_key_id = aws_access_key_id,
    aws_secret_access_key = aws_secret_access_key,
    region_name = region
    )

table = dynamodb.Table('widgetsTableName')

data = table.scan()

Upvotes: 110

Views: 173667

Answers (11)

Sudeep Suddapalli
Sudeep Suddapalli

Reputation: 1

A concrete function to get all the data along with passing the required parameters as per your needs. Hope this helps! Happy Coding!

import boto3


def get_all_items_from_db(table_name: str, **scan_kwargs):
    """
    Get all items from the DynamoDB table, avoiding pagination
    param: table_name
    param: scan_kwargs

    return: DynamoDB reponse object
    """
    dynamodb = boto3.resource('dynamodb')
    dynamodb_table = dynamodb.Table(table_name)
    response = dynamodb_table.scan(**scan_kwargs)  # Pass required parameters as per needs

    # Getting the last evaluated key for next pagination repsonse
    last_evaluted_key = response.get('LastEvaluatedKey')
    # Paginate returning up to 1MB of data for each iteration
    while last_evaluted_key:
        paginated_response = dynamodb_table.scan(
            **scan_kwargs,
            ExclusiveStartKey=last_evaluted_key,  # To start the iteration from the last evaluation key
        )
        last_evaluted_key = paginated_response.get('LastEvaluatedKey')
        # Extending the result list to include the paginated response
        response['Items'].extend(paginated_response['Items'])
    return response

Upvotes: 0

Alastair McCormack
Alastair McCormack

Reputation: 27704

I can't work out why Boto3 provides high-level resource abstraction but doesn't provide pagination. When it does provide pagination, it's hard to use!

The other answers to this question were good but I wanted a super simple way to wrap the boto3 methods and provide memory-efficient paging using generators:

import typing
import boto3
import boto3.dynamodb.conditions


def paginate_dynamodb_response(dynamodb_action: typing.Callable, **kwargs) -> typing.Generator[dict, None, None]:

    # Using the syntax from https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/python/example_code/dynamodb/GettingStarted/scenario_getting_started_movies.py
    keywords = kwargs

    done = False
    start_key = None

    while not done:
        if start_key:
            keywords['ExclusiveStartKey'] = start_key

        response = dynamodb_action(**keywords)

        start_key = response.get('LastEvaluatedKey', None)
        done = start_key is None

        for item in response.get("Items", []):
            yield item


## Usage ##
dynamodb_res = boto3.resource('dynamodb')
dynamodb_table = dynamodb_res.Table('my-table')

query = paginate_dynamodb_response(
    dynamodb_table.query, # The boto3 method. E.g. query or scan
    # Regular Query or Scan parameters
    #
    # IndexName='myindex' # If required
    KeyConditionExpression=boto3.dynamodb.conditions.Key('id').eq('1234')
)

for x in query:
    print(x)```

Upvotes: 3

Pierre D
Pierre D

Reputation: 26201

If you are landing here looking for a paginated scan with some filtering expression(s):

def scan(table, **kwargs):
    response = table.scan(**kwargs)
    yield from response['Items']
    while response.get('LastEvaluatedKey'):
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
        yield from response['Items']

Example usage:

table = boto3.Session(...).resource('dynamodb').Table('widgetsTableName')

items = list(scan(table, FilterExpression=Attr('name').contains('foo')))

Upvotes: 7

Richard
Richard

Reputation: 2625

DynamoDB limits the scan method to 1mb of data per scan.

Documentation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan

Here is an example loop to get all the data from a DynamoDB table using LastEvaluatedKey:

import boto3
client = boto3.client('dynamodb')

def dump_table(table_name):
    results = []
    last_evaluated_key = None
    while True:
        if last_evaluated_key:
            response = client.scan(
                TableName=table_name,
                ExclusiveStartKey=last_evaluated_key
            )
        else: 
            response = client.scan(TableName=table_name)
        last_evaluated_key = response.get('LastEvaluatedKey')
        
        results.extend(response['Items'])
        
        if not last_evaluated_key:
            break
    return results

# Usage
data = dump_table('your-table-name')

# do something with data

Upvotes: 47

Dan Hook
Dan Hook

Reputation: 7077

I had some problems with Vincent's answer related to the transformation being applied to the LastEvaluatedKey and messing up the pagination. Solved as follows:

import boto3

from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector

client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_model = client._service_model.operation_model('Scan')
trans = TransformationInjector(deserializer = TypeDeserializer())
operation_parameters = {
  'TableName': 'tablename',  
}
items = []

for page in paginator.paginate(**operation_parameters):
    has_last_key = 'LastEvaluatedKey' in page
    if has_last_key:
        last_key = page['LastEvaluatedKey'].copy()
    trans.inject_attribute_value_output(page, operation_model)
    if has_last_key:
        page['LastEvaluatedKey'] = last_key
    items.extend(page['Items'])

Upvotes: 4

Isac Casapu
Isac Casapu

Reputation: 1301

The 2 approaches suggested above both have problems: Either writing lengthy and repetitive code that handles paging explicitly in a loop, or using Boto paginators with low-level sessions, and foregoing the advantages of higher-level Boto objects.

A solution using Python functional code to provide a high-level abstraction allows higher-level Boto methods to be used, while hiding the complexity of AWS paging:

import itertools
import typing

def iterate_result_pages(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Generator:
    """A wrapper for functions using AWS paging, that returns a generator which yields a sequence of items for
    every response

    Args:
        function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
        This could be a bound method of an object.

    Returns:
        A generator which yields the 'Items' field of the result for every response
    """
    response = function_returning_response(*args, **kwargs)
    yield response["Items"]
    while "LastEvaluatedKey" in response:
        kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]
        response = function_returning_response(*args, **kwargs)
        yield response["Items"]

    return

def iterate_paged_results(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Iterator:
    """A wrapper for functions using AWS paging, that returns an iterator of all the items in the responses.
    Items are yielded to the caller as soon as they are received.

    Args:
        function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
        This could be a bound method of an object.

    Returns:
        An iterator which yields one response item at a time
    """
    return itertools.chain.from_iterable(iterate_result_pages(function_returning_response, *args, **kwargs))

# Example, assuming 'table' is a Boto DynamoDB table object:
all_items = list(iterate_paged_results(ProjectionExpression = 'my_field'))

Upvotes: 5

Abe Voelker
Abe Voelker

Reputation: 31574

Riffing off of Jordon Phillips's answer, here's how you'd pass a FilterExpression in with the pagination:

import boto3

client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_parameters = {
  'TableName': 'foo',
  'FilterExpression': 'bar > :x AND bar < :y',
  'ExpressionAttributeValues': {
    ':x': {'S': '2017-01-31T01:35'},
    ':y': {'S': '2017-01-31T02:08'},
  }
}

page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
    # do something

Upvotes: 38

Taeber
Taeber

Reputation: 1547

I think the Amazon DynamoDB documentation regarding table scanning answers your question.

In short, you'll need to check for LastEvaluatedKey in the response. Here is an example using your code:

import boto3
dynamodb = boto3.resource('dynamodb',
                          aws_session_token=aws_session_token,
                          aws_access_key_id=aws_access_key_id,
                          aws_secret_access_key=aws_secret_access_key,
                          region_name=region
)

table = dynamodb.Table('widgetsTableName')

response = table.scan()
data = response['Items']

while 'LastEvaluatedKey' in response:
    response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    data.extend(response['Items'])

Upvotes: 143

Vincent
Vincent

Reputation: 111

Code for deleting dynamodb format type as @kungphu mentioned.

import boto3

from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector

client = boto3.client('dynamodb')
paginator = client.get_paginator('query')
service_model = client._service_model.operation_model('Query')
trans = TransformationInjector(deserializer = TypeDeserializer())
for page in paginator.paginate():
    trans.inject_attribute_value_output(page, service_model)

Upvotes: 9

Jordon Phillips
Jordon Phillips

Reputation: 16003

boto3 offers paginators that handle all the pagination details for you. Here is the doc page for the scan paginator. Basically, you would use it like so:

import boto3

client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')

for page in paginator.paginate():
    # do something

Upvotes: 40

CJ_Spaz
CJ_Spaz

Reputation: 1204

Turns out that Boto3 captures the "LastEvaluatedKey" as part of the returned response. This can be used as the start point for a scan:

data= table.scan(
   ExclusiveStartKey=data['LastEvaluatedKey']
)

I plan on building a loop around this until the returned data is only the ExclusiveStartKey

Upvotes: 5

Related Questions