Luke
Luke

Reputation: 1834

django and asyncio - fetch data asynchronously from remote REST endpoint

I'm trying to rewrite a django management command in an asynchronous way using asyncio and aiohttp. Those are the files involved:

# rest_async.py
async def t_search_coro(token, loop, **kwargs):
    """
    ws T Search Query:
    kwargs:
    - modification_start_date: (str) Format: YYYY-MM-DDTHH:MM:SS (e.g.: 2013-02-26T11:00:00)
    - modification_end_date: (str) Format: YYYY-MM-DDTHH:MM:SS (e.g.: 2013-02-26T11:00:00)
    - lo_type: (str) LO Type. Defaults to 'Event'
    - status: (str) T Status of the LO. Required
    - portal: portal. Default: settings.PORTAL
    - page_nr: PageNumber querystring parameter. Default: 1
    """
    path = '/services/api/TSearch'
    method = 'GET'
    modification_start_date = kwargs.pop('modification_start_date')
    modification_end_date = kwargs.pop('modification_end_date')
    lo_type = kwargs.pop('lo_type', 'Event')
    status = kwargs.pop('status')
    portal = kwargs.pop('portal', settings.PORTAL)
    page_nr = kwargs.pop('page_nr', 1)
    debugging = kwargs.pop('debugging', True)
    signature_kws = get_signature_kwargs(token, path, method)
    headers = signature_kws.get('headers')
    params = {
        'LOType': lo_type,
        'Status': status,
        'PageNumber': page_nr,
        'format': 'JSON'
    }
    if modification_start_date is not None:
        params['ModificationStartDate'] = modification_start_date
    if modification_end_date is not None:
        params['ModificationEndDate'] = modification_end_date

    service_end_point = 'https://{}.example.net{}'.format(portal, path)
    print("fetching data: {} - {}".format(modification_start_date, modification_end_date))
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url=service_end_point, params=params, headers=headers) as resp:
            assert resp.status == 200
            return await resp.read()


# utils_async.py

async def fetch_t_data_coro(
        loop, lo_type='Session', modification_start_date=now()-timedelta(hours=22), modification_end_date=now(),
    status='Completed', **kwargs):
    date_fmt = "%Y-%m-%dT%H:%M:%S"
    if (modification_end_date - modification_start_date).total_seconds() > timedelta(days=1).total_seconds():
        raise Exception("modification start/end datetime interval must be within 24 hrs."
                        "\nmod. start date: {}\nmod. end date: {}".format(
            modification_start_date.strftime(date_fmt), modification_end_date.strftime(date_fmt)
        ))
    debugging = kwargs.pop('debugging', False)
    page_nr = kwargs.get('page_nr', 1)
    modification_start_date = modification_start_date.strftime(date_fmt)
    modification_end_date = modification_end_date.strftime(date_fmt)
    rtn_data = []
    params = {
        'LOType': lo_type, 'Status': status, 'PageNumber': page_nr, 'format': 'JSON'
    }
    already_added = set()
    while True:
        data = await rest_async.t_search_coro(
            token, loop, modification_start_date=modification_start_date, modification_end_date=modification_end_date,
            lo_type=lo_type, status=status, page_nr=page_nr, debugging=debugging
        )
        data_dict = json.loads(data.decode('utf-8'))
        if 'data' not in data_dict:
            break
        total_pages = data_dict['data'][0]['T_Item']['TotalPages']
        t_raw_data = data_dict['data'][0]['T_Item']['T']
        for item in t_raw_data:
            _h = hash(json.dumps(item, sort_keys=True))
            if _h in already_added:
                continue
            already_added.add(_h)
            rtn_data.append(item)
        if page_nr >= total_pages:
            break
        page_nr += 1
    return rtn_data


# load_data_async.py (actual django management command)

import asyncio
from datetime import timedelta, datetime
import argparse
import logging

from django.core.management.base import BaseCommand
from django.utils.timezone import now

from myapp.utils_async import fetch_transcript_data_coro

RUNNING_INTERVAL_MINS = 60
logger = logging.getLogger('my_proj')
MAX_BACKDAYS = 160
BACKDAYS_HOURS = {3, 9, 15, 21}
DEFAULT_TIMEFRAME=24
GO_BACK_DAYS = 30
GO_BACK_DAYS_TIMEFRAME = 24


class Command(BaseCommand):
    help = "fetch data asynchrounously"

    def add_arguments(self, parser):
        parser.add_argument(
            '--timeframe', action='store', dest='timeframe', default=DEFAULT_TIMEFRAME, type=int,
            help='Timeframe hours to be used (default to 24, range: 1 to 24)'
        )
        parser.add_argument(
            '--backdays', action='store', dest='backdays', default=None, type=int,
            help='repeat the command execution (for the same timeframe) n days before the current day'
        )

        parser.add_argument('--start-date', type=valid_date_type)
        parser.add_argument('--end-date', type=valid_date_type)

    def handle(self, *args, **options):
        self.loop = asyncio.get_event_loop()
        self.loop.run_until_complete(self._handle(*args, **options))

    async def _handle(self, *args, **options):
        timeframe = options.get('timeframe')
        backdays = options.get('backdays', None)
        start_date = options.get('start_date')
        end_date = options.get('end_date')
        backdays = backdays + 1 if backdays is not None else 1
        if all([start_date is not None, end_date is not None]):
            days_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
        else:
            days_range = [now() - timedelta(days=x) for x in range(backdays)]
        for mod_end_datetime in days_range:
            mod_start_datetime = mod_end_datetime - timedelta(minutes=RUNNING_INTERVAL_MINS * timeframe)
            data = await fetch_t_data_coro(
                loop=self.loop, modification_start_date=mod_start_datetime, modification_end_date=mod_end_datetime
            )

def valid_date_type(arg_date_str):
    try:
        return datetime.strptime(arg_date_str, "%Y-%m-%d")
    except ValueError:
        msg = "Given Date ({0}) not valid! Expected format, YYYY-MM-DD!".format(arg_date_str)
        raise argparse.ArgumentTypeError(msg)

I then tried to run the cmd as:

python manage.py load_data_async --start-date 2018-04-20 --end-date 2018-06-6

the command runs without errors, however it seems from the print statement that the coroutines are executed sequentially, in the same way of the original synchrounous code:

# output
fetching data: 2018-04-19T00:00:00 - 2018-04-20T00:00:00
fetching data: 2018-04-19T00:00:00 - 2018-04-20T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-20T00:00:00 - 2018-04-21T00:00:00
fetching data: 2018-04-21T00:00:00 - 2018-04-22T00:00:00
fetching data: 2018-04-21T00:00:00 - 2018-04-22T00:00:00
fetching data: 2018-04-21T00:00:00 - 2018-04-22T00:00:00
fetching data: 2018-04-22T00:00:00 - 2018-04-23T00:00:00
fetching data: 2018-04-23T00:00:00 - 2018-04-24T00:00:00
fetching data: 2018-04-24T00:00:00 - 2018-04-25T00:00:00
fetching data: 2018-04-24T00:00:00 - 2018-04-25T00:00:00
fetching data: 2018-04-25T00:00:00 - 2018-04-26T00:00:00
fetching data: 2018-04-25T00:00:00 - 2018-04-26T00:00:00
fetching data: 2018-04-25T00:00:00 - 2018-04-26T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00
fetching data: 2018-04-26T00:00:00 - 2018-04-27T00:00:00

...
...
fetching data: 2018-05-22T00:00:00 - 2018-05-23T00:00:00
fetching data: 2018-05-22T00:00:00 - 2018-05-23T00:00:00
fetching data: 2018-05-23T00:00:00 - 2018-05-24T00:00:00
fetching data: 2018-05-23T00:00:00 - 2018-05-24T00:00:00
fetching data: 2018-05-24T00:00:00 - 2018-05-25T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-25T00:00:00 - 2018-05-26T00:00:00
fetching data: 2018-05-26T00:00:00 - 2018-05-27T00:00:00
fetching data: 2018-05-27T00:00:00 - 2018-05-28T00:00:00
fetching data: 2018-05-28T00:00:00 - 2018-05-29T00:00:00
fetching data: 2018-05-29T00:00:00 - 2018-05-30T00:00:00
fetching data: 2018-05-30T00:00:00 - 2018-05-31T00:00:00
fetching data: 2018-05-30T00:00:00 - 2018-05-31T00:00:00
fetching data: 2018-05-30T00:00:00 - 2018-05-31T00:00:00
fetching data: 2018-05-31T00:00:00 - 2018-06-01T00:00:00
fetching data: 2018-05-31T00:00:00 - 2018-06-01T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-01T00:00:00 - 2018-06-02T00:00:00
fetching data: 2018-06-02T00:00:00 - 2018-06-03T00:00:00
fetching data: 2018-06-02T00:00:00 - 2018-06-03T00:00:00
fetching data: 2018-06-02T00:00:00 - 2018-06-03T00:00:00
fetching data: 2018-06-03T00:00:00 - 2018-06-04T00:00:00
fetching data: 2018-06-03T00:00:00 - 2018-06-04T00:00:00
fetching data: 2018-06-04T00:00:00 - 2018-06-05T00:00:00
fetching data: 2018-06-04T00:00:00 - 2018-06-05T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00
fetching data: 2018-06-05T00:00:00 - 2018-06-06T00:00:00

Anyone noticed something wrong? or this is the correct behavior? I have no experience with asyncio but I was expecting not a sequential execution...

python version: 3.6.3

Upvotes: 1

Views: 1304

Answers (1)

user4815162342
user4815162342

Reputation: 154906

The code seems to await the fetch_t_data_coro invocations one by one, which forces them to run in sequence.

To run them in parallel, you can use asyncio.gather:

        coros = []
        for mod_end_datetime in days_range:
            mod_start_datetime = mod_end_datetime - timedelta(minutes=RUNNING_INTERVAL_MINS * timeframe)
            coros.append(fetch_t_data_coro(
                loop=self.loop, modification_start_date=mod_start_datetime, modification_end_date=mod_end_datetime
            ))
        data_list = await asyncio.gather(*coros)

Two unrelated notes:

  • The code instantiates aiohttp.ClientSession in each t_search_coro. This is an anti-pattern - you should create a single ClientSession at top-level and pass it down to individual coroutines (even ones running in parallel), so that they all share the same session instance.
  • Beginning with Python 3.5.3, asyncio.get_event_loop() will correctly pick up the running event loop when called from a coroutine. As a result, you don't need to send the loop object down the coroutine invocations, just call get_event_loop when you need it (which in your code you don't, since ClientSession also correctly infers the event loop on its own).

Upvotes: 2

Related Questions