Reputation: 422
The goal is to try to load a large amount of "bulked" jsons from s3. I found aiobotocore
and felt urged to try in hope to get more efficiency and at the same time familiarise myself with asyncio
. I gave it a shot, and it works but I know basically nada about asynchronous programming. Therefore, I was hoping for some improvements/comments. Maybe there are some kind souls out there that can spot some obvious mistakes.
The problem is that boto3 only supports one http request at a time. By utilising Threadpool
I managed to get significant improvements, but I'm hoping for a more efficient way.
Here is the code:
Imports:
import os
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE
Some helper generator I found somewhere to return decoded jsons from string with multiple jsons.
def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
'''helper for parsing individual jsons from string of jsons (stolen from somewhere)'''
string = str(string_or_fp)
decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()
This function gets keys from an s3 bucket with a given prefix:
# Async stuff starts here
async def get_keys(loop, bucket, prefix):
'''Get keys in bucket based on prefix'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = []
# list s3 objects using paginator
paginator = client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get('Contents', []):
keys.append(c['Key'])
return keys
This function gets the content for a provided key. Untop of that it flattens the list of decoded content:
async def get_object(loop,bucket, key):
'''Get json content from s3 object'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
# get object from s3
response = await client.get_object(Bucket=bucket, Key=key)
async with response['Body'] as stream:
content = await stream.read()
return list(iterload(content.decode()))
Here is the main function which gathers the contents for all the found keys and flattens the list of contents.
async def go(loop, bucket, prefix):
'''Returns list of dicts of object contents'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = await get_keys(loop, bucket, prefix)
contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])
return list(chain.from_iterable(contents))
Finally, I run this and the result list of dicts ends up nicely in result
loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))
One thing that I think might be a bit wierd is that I create a client in each async function. Probably that can be lifted out. Note sure about how aiobotocore
works with multiple clients.
Furthermore, I think that you would not need to await that all keys are loaded before loading the objects for the keys, which I think is the case in this implementation. I'm assuming that as soon as a key is found you could call get_object
. So, maybe it should be an async generator
. But I'm not completely in the clear here.
Thank you in advance! Hope this helps someone in a similar situation.
Upvotes: 4
Views: 3791
Reputation: 477
first check out aioboto3
second, each client in aiobotocore is associated with an aiohttp session. Each session can have up to max_pool_connections. This is why in the basic aiobotocore example it does an async with
on the create_client
. So the pool is closed when done using the client.
Here are some tips:
Based on the above here is how I would do it:
import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial
# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config
_NUM_WORKERS = 50
def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
# helper for parsing individual jsons from string of jsons (stolen from somewhere)
string = str(string_or_fp)
decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()
async def get_object(s3_client, bucket: str, key: str):
# Get json content from s3 object
# get object from s3
response = await s3_client.get_object(Bucket=bucket, Key=key)
async with response['Body'] as stream:
content = await stream.read()
return list(iterload(content.decode()))
async def go(bucket: str, prefix: str) -> List[dict]:
"""
Returns list of dicts of object contents
:param bucket: s3 bucket
:param prefix: s3 bucket prefix
:return: list of dicts of object contents
"""
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
session = aiobotocore.session.AioSession()
config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
contents = []
async with session.create_client('s3', config=config) as client:
worker_co = partial(get_object, client, bucket)
async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
# list s3 objects using paginator
paginator = client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get('Contents', []):
contents.append(await work_pool.push(c['Key']))
# retrieve results from futures
contents = [c.result() for c in contents]
return list(chain.from_iterable(contents))
_loop = asyncio.get_event_loop()
_result = _loop.run_until_complete(go('some-bucket', 'some-prefix'))
Upvotes: 4