Reputation: 1593
We are using boto3 for our DynamoDB and we need to do a full scan of our tables to enable to do that based on other post we need to do a pagination. However, we are unable to find a working sample of pagination. Here is what we did.
import boto3
client_setting = boto3.client('dynamodb', region_name='ap-southeast-2')
paginator = client_setting.get_paginator('scan')
esk = {}
data = []
unconverted_ga = ourQuery(params1, params2)
for page in unconverted_ga:
data.append(page)
esk = page['LastEvaluatedKey']
We dont know exactly how to make the esk as the ExclusiveStartKey of our next query. What should be the expected value of ExclusiveStartkey parameter? We are still new in DynamoDB and there's many things we need to learn including this. thanks!
Upvotes: 12
Views: 39450
Reputation: 8513
Late answer, but here is reusable function you can just call like the native boto3 methods but that will handle the pagination and stream the results to you. Some noteworthy things:
boto3-stubs
, otherwise you can delete themfrom typing import Generator, Unpack
from mypy_boto3_dynamodb.service_resource import Table
from mypy_boto3_dynamodb.type_defs import ScanInputTableScanTypeDef
def scan(table: Table, **kwargs: Unpack[ScanInputTableScanTypeDef]) -> Generator[dict, None, None]:
""" Wrapper for scan that handles the pagination for you """
page = {'LastEvaluatedKey': '--'}
while page.get("LastEvaluatedKey"):
page = table.scan(**kwargs)
for item in page['Items']:
yield item
kwargs['ExclusiveStartKey'] = page.get('LastEvaluatedKey')
Here is an example usage:
import boto3
from boto3.dynamodb.conditions import Key
ddb = boto3.resource('dynamodb')
table = ddb.Table('some-table-name')
for item in scan(
table,
FilterExpression=Key('foo').eq('bar')
):
print(item)
And as a bonus, here is the same style wrapper for the query function.
from typing import Generator, Unpack
from mypy_boto3_dynamodb.service_resource import Table
from mypy_boto3_dynamodb.type_defs import QueryInputTableQueryTypeDef
def query(table: Table, **kwargs: Unpack[QueryInputTableQueryTypeDef]) -> Generator[dict, None, None]:
""" Wrapper for query that handles the pagination for you """
page = {'LastEvaluatedKey': '--'}
while page.get("LastEvaluatedKey"):
page = table.query(**kwargs)
for item in page['Items']:
yield item
kwargs['ExclusiveStartKey'] = page.get('LastEvaluatedKey')
Upvotes: 0
Reputation: 11355
From the answer by Tay B at https://stackoverflow.com/a/38619425/3176550
import boto3
dynamodb = boto3.resource(
'dynamodb',
aws_session_token=aws_session_token,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region
)
table = dynamodb.Table('widgetsTableName')
response = table.scan()
data = response['Items']
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
data.extend(response['Items'])
Upvotes: 7
Reputation: 41
Bit more verbose but I like it.
def fetch_from_table(last_key=None):
if last_key:
response = table.query(
IndexName='advertCatalogIdx',
KeyConditionExpression=Key('sk').eq('CATALOG'),
Limit=5,
ExclusiveStartKey=last_key
)
else:
response = table.query(
IndexName='advertCatalogIdx',
KeyConditionExpression=Key('sk').eq('CATALOG'),
Limit=5
)
# print(response)
for item in response['Items']:
print(item['address'])
print('***************************')
return response.get('LastEvaluatedKey')
last_key = fetch_from_table()
while last_key != None:
print("Running again : ")
last_key = fetch_from_table(last_key)
Upvotes: 1
Reputation: 31
import sys
import boto3
client = boto3.client('dynamodb')
marker = None
while True:
paginator = client.get_paginator('list_tables')
page_iterator = paginator.paginate(
PaginationConfig={
'MaxItems': 1000,
'PageSize': 100,
'StartingToken': marker})
for page in page_iterator:
tables=page['TableNames']
for table in tables:
print (table)
try:
marker = page['NextToken']
except KeyError:
sys.exit()
Upvotes: 3
Reputation: 8254
You can try with following code:
esk = None
while True:
scan_generator = YourTableName.scan(max_results=10, exclusive_start_key=esk)
for item in scan_generator:
# your code for processing
# condition to check if entire table is scanned
else:
break;
# Load the last keys
esk = scan_generator.kwargs['exclusive_start_key'].values()
Here is the reference documentation link.
Hope that helps
Upvotes: 7
Reputation: 1593
After hour of search, i've finally found a better solution. For those who are new to DynamoDB, we should'nt missed this - http://docs.aws.amazon.com/amazondynamodb/latest/gettingstartedguide/GettingStarted.Python.04.html
from __future__ import print_function # Python 2/3 compatibility
import boto3
import json
import decimal
from boto3.dynamodb.conditions import Key, Attr
# Helper class to convert a DynamoDB item to JSON.
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
if o % 1 > 0:
return float(o)
else:
return int(o)
return super(DecimalEncoder, self).default(o)
dynamodb = boto3.resource('dynamodb', region_name='us-west-2', endpoint_url="http://localhost:8000")
table = dynamodb.Table('Movies')
fe = Key('year').between(1950, 1959)
pe = "#yr, title, info.rating"
# Expression Attribute Names for Projection Expression only.
ean = { "#yr": "year", }
esk = None
response = table.scan(
FilterExpression=fe,
ProjectionExpression=pe,
ExpressionAttributeNames=ean
)
for i in response['Items']:
print(json.dumps(i, cls=DecimalEncoder))
// As long as LastEvaluatedKey is in response it means there are still items from the query related to the data
while 'LastEvaluatedKey' in response:
response = table.scan(
ProjectionExpression=pe,
FilterExpression=fe,
ExpressionAttributeNames= ean,
ExclusiveStartKey=response['LastEvaluatedKey']
)
for i in response['Items']:
print(json.dumps(i, cls=DecimalEncoder))
Upvotes: 8