Leonard Aukea
Leonard Aukea

Reputation: 422

Reading multiple "bulked" jsons from s3 asynchronously. Is there a better way?

The goal is to try to load a large amount of "bulked" jsons from s3. I found aiobotocore and felt urged to try in hope to get more efficiency and at the same time familiarise myself with asyncio. I gave it a shot, and it works but I know basically nada about asynchronous programming. Therefore, I was hoping for some improvements/comments. Maybe there are some kind souls out there that can spot some obvious mistakes.

The problem is that boto3 only supports one http request at a time. By utilising Threadpool I managed to get significant improvements, but I'm hoping for a more efficient way.

Here is the code:

Imports:

import os 
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE

Some helper generator I found somewhere to return decoded jsons from string with multiple jsons.

def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    '''helper for parsing individual jsons from string of jsons (stolen from somewhere)'''
    string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()

This function gets keys from an s3 bucket with a given prefix:

# Async stuff starts here
async def get_keys(loop, bucket, prefix):
    '''Get keys in bucket based on prefix'''

    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
        keys = []
        # list s3 objects using paginator
        paginator = client.get_paginator('list_objects')
        async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for c in result.get('Contents', []):
                keys.append(c['Key'])
        return keys

This function gets the content for a provided key. Untop of that it flattens the list of decoded content:

async def get_object(loop,bucket, key):
    '''Get json content from s3 object'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:


        # get object from s3
        response = await client.get_object(Bucket=bucket, Key=key)
        async with response['Body'] as stream:
            content = await stream.read()    

    return list(iterload(content.decode()))       

Here is the main function which gathers the contents for all the found keys and flattens the list of contents.

async def go(loop, bucket, prefix):
    '''Returns list of dicts of object contents'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:

        keys = await get_keys(loop, bucket, prefix)

        contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])     

        return list(chain.from_iterable(contents))

Finally, I run this and the result list of dicts ends up nicely in result

loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))

Thank you in advance! Hope this helps someone in a similar situation.

Upvotes: 4

Views: 3791

Answers (1)

amohr
amohr

Reputation: 477

first check out aioboto3

second, each client in aiobotocore is associated with an aiohttp session. Each session can have up to max_pool_connections. This is why in the basic aiobotocore example it does an async with on the create_client. So the pool is closed when done using the client.

Here are some tips:

  1. You should use a work pool, created by me, modularized by CaliDog to avoid polluting your event loop. When using this think of your workflow as a stream.
  2. This will avoid you having to use asyncio.gather, which will leave tasks running in the background after the first exception is thrown.
  3. You should tune your work loop size and max_pool_connections together, and only use one client with the number of tasks you want to (or can based on compute required) support in parallel.
  4. You really don't need to pass the loop around as with modern python versions there's one loop per thread
  5. You should use aws profiles (profile param to Session init)/environment variables so you don't need to hardcode key and region information.

Based on the above here is how I would do it:

import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial

# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config

_NUM_WORKERS = 50


def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    # helper for parsing individual jsons from string of jsons (stolen from somewhere)
    string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()


async def get_object(s3_client, bucket: str, key: str):
    # Get json content from s3 object

    # get object from s3
    response = await s3_client.get_object(Bucket=bucket, Key=key)
    async with response['Body'] as stream:
        content = await stream.read()

    return list(iterload(content.decode()))


async def go(bucket: str, prefix: str) -> List[dict]:
    """
    Returns list of dicts of object contents

    :param bucket: s3 bucket
    :param prefix: s3 bucket prefix
    :return: list of dicts of object contents
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()

    session = aiobotocore.session.AioSession()
    config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
    contents = []
    async with session.create_client('s3', config=config) as client:
        worker_co = partial(get_object, client, bucket)
        async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
                                       return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
            # list s3 objects using paginator
            paginator = client.get_paginator('list_objects')
            async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
                for c in result.get('Contents', []):
                    contents.append(await work_pool.push(c['Key']))

    # retrieve results from futures
    contents = [c.result() for c in contents]
    return list(chain.from_iterable(contents))


_loop = asyncio.get_event_loop()
_result = _loop.run_until_complete(go('some-bucket', 'some-prefix'))

Upvotes: 4

Related Questions