Farhood ET
Farhood ET

Reputation: 1541

Making sense of asyncio.as_completed()

Imagine a simple program like this, using which I'm getting a field of data from multiple pages of json data available through an api gateway. (Sorry I couldn't find a free json api that supports pagination to make the example completely reproducable.)

import asyncio
import aiohttp

async def fetch(url, params = None):
    async with aiohttp.ClientSession() as session:
        async with session.get(url, params) as response:
            return await response.json()

async def get_all_pages(base_url):
    def paginate(size=10**6):
        limit = 100
        offset = 0
        while offset <= size:
            yield {"offset": offset, "limit": limit}
            offset += limit
    total = (await fetch(base_url))["data"]["total"] # total number of pages
    coroutines = [fetch(base_url, params) for params in paginate(total)]
    print("total number of pages: {}, total number of coroutines: {}".format(total, len(coroutines))
    for routine in asyncio.as_completed(coroutines):
        r = await routine
        yield r["data"]["field"] #a field in the data for each page

async def main():
    url = "http://arandomurl.com"
    results = []
    async for x in get_all_pages(url):
        results.append(x)

    print(len(results)) #returns 1 -> only the first element is returned

asyncio.run(main())

The problem is that the for loop in my main function only retrieves the first element of my generator, somehow the generator stops after publishing the first element. Which means that as_completed is not working as I thought it would work in def_get_all_pages: Publishing the results of the completed coroutines and then passing it to the yield r["data"]["field"]. line. How can I do this correctly?

Upvotes: 1

Views: 314

Answers (1)

Paul Cornelius
Paul Cornelius

Reputation: 10946

Here is a test program I wrote. I took the code posted in the question and replaced the guts of the function "fetch" to return a dictionary. With this change I can actually run the program, and it works. I get one item in "results" for every 100 "pages."

import asyncio

async def fetch(_url, params = None):
    if params is None:
        return {"data": {"total": 169}}
    return {"data": {"field" : str(params)}}

async def get_all_pages(base_url):
    def paginate(size=10**6):
        limit = 100
        offset = 0
        while offset <= size:
            yield {"offset": offset, "limit": limit}
            offset += limit
    total = (await fetch(base_url))["data"]["total"] # total number of pages
    coroutines = [fetch(base_url, params) for params in paginate(total)]
    print("total number of pages: {}, total number of coroutines: {}".format(
        total, len(coroutines)))
    for routine in asyncio.as_completed(coroutines):
        r = await routine
        yield r["data"]["field"] #a field in the data for each page

async def main():
    url = "http://arandomurl.com"
    results = []
    async for x in get_all_pages(url):
        results.append(x)

    print(results) #returns 1 -> only the first element is returned

asyncio.run(main())

Upvotes: 1

Related Questions