MuntingInsekto
MuntingInsekto

Reputation: 1593

DynamoDB pagination using Boto3

We are using boto3 for our DynamoDB and we need to do a full scan of our tables to enable to do that based on other post we need to do a pagination. However, we are unable to find a working sample of pagination. Here is what we did.

import boto3
client_setting = boto3.client('dynamodb', region_name='ap-southeast-2')
paginator = client_setting.get_paginator('scan')
esk = {}
data = []
unconverted_ga = ourQuery(params1, params2)
    for page in unconverted_ga:
        data.append(page)
        esk = page['LastEvaluatedKey']

We dont know exactly how to make the esk as the ExclusiveStartKey of our next query. What should be the expected value of ExclusiveStartkey parameter? We are still new in DynamoDB and there's many things we need to learn including this. thanks!

Upvotes: 12

Views: 39450

Answers (6)

Ryan Widmaier
Ryan Widmaier

Reputation: 8513

Late answer, but here is reusable function you can just call like the native boto3 methods but that will handle the pagination and stream the results to you. Some noteworthy things:

from typing import Generator, Unpack

from mypy_boto3_dynamodb.service_resource import Table
from mypy_boto3_dynamodb.type_defs import ScanInputTableScanTypeDef


def scan(table: Table, **kwargs: Unpack[ScanInputTableScanTypeDef]) -> Generator[dict, None, None]:
    """ Wrapper for scan that handles the pagination for you """
    page = {'LastEvaluatedKey': '--'}
    while page.get("LastEvaluatedKey"):
        page = table.scan(**kwargs)
        for item in page['Items']:
            yield item

        kwargs['ExclusiveStartKey'] = page.get('LastEvaluatedKey')

Here is an example usage:

import boto3
from boto3.dynamodb.conditions import Key

ddb = boto3.resource('dynamodb')
table = ddb.Table('some-table-name')

for item in scan(
    table,
    FilterExpression=Key('foo').eq('bar')
):
    print(item)

And as a bonus, here is the same style wrapper for the query function.

from typing import Generator, Unpack

from mypy_boto3_dynamodb.service_resource import Table
from mypy_boto3_dynamodb.type_defs import QueryInputTableQueryTypeDef

def query(table: Table, **kwargs: Unpack[QueryInputTableQueryTypeDef]) -> Generator[dict, None, None]:
    """ Wrapper for query that handles the pagination for you """
    page = {'LastEvaluatedKey': '--'}
    while page.get("LastEvaluatedKey"):
        page = table.query(**kwargs)
        for item in page['Items']:
            yield item

        kwargs['ExclusiveStartKey'] = page.get('LastEvaluatedKey')

Upvotes: 0

Jonathan
Jonathan

Reputation: 11355

From the answer by Tay B at https://stackoverflow.com/a/38619425/3176550

import boto3
dynamodb = boto3.resource(
    'dynamodb',
    aws_session_token=aws_session_token,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region
)
    
    table = dynamodb.Table('widgetsTableName')
    
    response = table.scan()
    data = response['Items']
    
    while 'LastEvaluatedKey' in response:
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
        data.extend(response['Items'])

Upvotes: 7

wikus schalkwyk
wikus schalkwyk

Reputation: 41

Bit more verbose but I like it.

def fetch_from_table(last_key=None):
    if last_key:
        response = table.query(
            IndexName='advertCatalogIdx',
            KeyConditionExpression=Key('sk').eq('CATALOG'),
            Limit=5,
            ExclusiveStartKey=last_key
        )
    else:
        response = table.query(
            IndexName='advertCatalogIdx',
            KeyConditionExpression=Key('sk').eq('CATALOG'),
            Limit=5
        )
        # print(response)

    for item in response['Items']:
        print(item['address'])
        print('***************************')

    return response.get('LastEvaluatedKey')


last_key = fetch_from_table()
while last_key != None:
    print("Running again : ")
    last_key = fetch_from_table(last_key)

Upvotes: 1

Parag Nagwekar
Parag Nagwekar

Reputation: 31

import sys
import boto3

client = boto3.client('dynamodb')
marker = None

while True:
   paginator = client.get_paginator('list_tables')
   page_iterator = paginator.paginate(
       PaginationConfig={
           'MaxItems': 1000,
           'PageSize': 100,
           'StartingToken': marker})

   for page in page_iterator:
       tables=page['TableNames']
       for table in tables:
           print (table)
   try:
       marker = page['NextToken']
    except KeyError:
       sys.exit()

Upvotes: 3

Harshal Bulsara
Harshal Bulsara

Reputation: 8254

You can try with following code:

esk = None

while True:
    scan_generator = YourTableName.scan(max_results=10, exclusive_start_key=esk)
    for item in scan_generator:
        # your code for processing
    # condition to check if entire table is scanned 
    else:
        break;

    # Load the last keys
    esk = scan_generator.kwargs['exclusive_start_key'].values()

Here is the reference documentation link.

Hope that helps

Upvotes: 7

MuntingInsekto
MuntingInsekto

Reputation: 1593

After hour of search, i've finally found a better solution. For those who are new to DynamoDB, we should'nt missed this - http://docs.aws.amazon.com/amazondynamodb/latest/gettingstartedguide/GettingStarted.Python.04.html

from __future__ import print_function # Python 2/3 compatibility
import boto3
import json
import decimal
from boto3.dynamodb.conditions import Key, Attr

# Helper class to convert a DynamoDB item to JSON.
class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)

dynamodb = boto3.resource('dynamodb', region_name='us-west-2', endpoint_url="http://localhost:8000")

table = dynamodb.Table('Movies')

fe = Key('year').between(1950, 1959)
pe = "#yr, title, info.rating"
# Expression Attribute Names for Projection Expression only.
ean = { "#yr": "year", }
esk = None


response = table.scan(
    FilterExpression=fe,
    ProjectionExpression=pe,
    ExpressionAttributeNames=ean
    )

for i in response['Items']:
    print(json.dumps(i, cls=DecimalEncoder))

// As long as LastEvaluatedKey is in response it means there are still items from the query related to the data
while 'LastEvaluatedKey' in response:
    response = table.scan(
        ProjectionExpression=pe,
        FilterExpression=fe,
        ExpressionAttributeNames= ean,
        ExclusiveStartKey=response['LastEvaluatedKey']
        )

    for i in response['Items']:
        print(json.dumps(i, cls=DecimalEncoder))

Upvotes: 8

Related Questions