TMarks
TMarks

Reputation: 564

Utilizing asyncio generators and asyncio.as_completed

I have some code that I am using to scrape a url, parse the information, and then drop it into a DB using SQLAlchemy. I'm trying to do it asynchronously while limiting the maximum number of simultaneous requests.

Here is my code:

async def get_url(aiohttp_session, url1, url2):
    async with session.get(url1) as r_url1:
       if r_url1.status == 200:
          async with session.get(url2) as r_url2:
             if r_url2.status == 200:
                return await r_url1.json(), await r_url2.json()

async def url_generator(formatted_start_date, formatted_end_date, machine_id, interval):
    interval_start = formatted_start_date
    interval_end = formatted_start_date + interval

    while interval_end <= formatted_end_date:
        yield (f"https://example.org/start={interval_start}"
               f"Start={datetime.strftime(interval_start, DATETIME_FORMAT)}"
               f"&End={datetime.strftime(interval_end, DATETIME_FORMAT)}"
               f"&machines={machine_id}",
               f"https://example.org/start={interval_start}"
               f"Start={datetime.strftime(interval_start, DATETIME_FORMAT)}"
               f"&End={datetime.strftime(interval_end, DATETIME_FORMAT)}"
               f"&machines={machine_id}&groupby=Job"
               )
        interval_start += interval
        interval_end += interval

async def parse(database, url1_json, url2_json):
    """ Do some parsing and save it using credentials stored in the database object """


def main(database, formatted_start_date, formatted_end_date, machine_id, interval):
    async for url1_json, url2_json in asyncio.as_completed(url_generator(formatted_start_date, formatted_end_date, machine_id, interval)):
         parse(database, url1_json, url2_json)

I am getting the error yield from should be used as context manager expression.

I've tried looking at the documentation here as well as the Synchronization Primitives and am still confused as to where I've gone wrong and how I should go about creating tasks from my generator.

Upvotes: 4

Views: 1931

Answers (1)

user4815162342
user4815162342

Reputation: 155670

There are several problems with the posted code:

  • You're trying to use as_completed as an async iterator, iterating over its results with async for. However, as_completed doesn't return an async iterator (at least not yet) and must be iterated with regular for, and await each yielded object explicitly, as shown in the docs.

  • You are passing an async iterator to as_completed, whereas it accepts an ordinary container, or (regular) iterable.

  • You are using async for in a function not defined with async def, which should be a syntax error. Also, parse() is defined as a coroutine, and you are not awaiting it.

The good news is that, since url_generator is already a generator, you don't need as_completed at all, you should be able to just iterate over it:

async def main(database, formatted_start_date, formatted_end_date,
               machine_id, interval):
    async for url1_json, url2_json in url_generator(
            formatted_start_date, formatted_end_date,
            machine_id, interval)):
        await parse(database, url1_json, url2_json)

Note, however, that async for won't automatically parallelize the iteration, it will just allow other coroutines to run in parallel with the coroutine that iterates. To parallelize the iteration, you need to call create_task to submit the tasks in parallel, and use an asyncio.Semaphore to limit the number of parallel tasks. For example:

async def parse(database, url1_json, url2_json, limit):
    # async with applied to a semaphore ensures that no more than N
    # coroutines that use the same semaphore enter the "with" block
    # in parallel
    async with limit:
        ... code goes here ...

async def main(database, formatted_start_date, formatted_end_date,
               machine_id, interval):
    limit = asyncio.Semaphore(10)

    # create all coroutines in advance using create_task
    # and run them in parallel, relying on the semaphore
    # limit the number of simultaneous requests
    tasks = []
    async for url1_json, url2_json in url_generator(
            formatted_start_date, formatted_end_date,
            machine_id, interval)):
        # this create_task just creates the task - it will
        # start running when we return to the event loop
        tasks.append(asyncio.create_task(parse(database, url1_json, url2_json, limit))

    # suspend to the event loop, resuming this coroutine only after
    # all the tasks have finished (or any of them raises)
    await asyncio.gather(*tasks)

Note that url_generator doesn't need to be async because it doesn't need to await anything. You can define it with def and iterate over it with for.

Upvotes: 4

Related Questions