Reputation: 1541
Imagine a simple program like this, using which I'm getting a field of data from multiple pages of json data available through an api gateway. (Sorry I couldn't find a free json api that supports pagination to make the example completely reproducable.)
import asyncio
import aiohttp
async def fetch(url, params = None):
async with aiohttp.ClientSession() as session:
async with session.get(url, params) as response:
return await response.json()
async def get_all_pages(base_url):
def paginate(size=10**6):
limit = 100
offset = 0
while offset <= size:
yield {"offset": offset, "limit": limit}
offset += limit
total = (await fetch(base_url))["data"]["total"] # total number of pages
coroutines = [fetch(base_url, params) for params in paginate(total)]
print("total number of pages: {}, total number of coroutines: {}".format(total, len(coroutines))
for routine in asyncio.as_completed(coroutines):
r = await routine
yield r["data"]["field"] #a field in the data for each page
async def main():
url = "http://arandomurl.com"
results = []
async for x in get_all_pages(url):
results.append(x)
print(len(results)) #returns 1 -> only the first element is returned
asyncio.run(main())
The problem is that the for loop in my main
function only retrieves the first element of my generator, somehow the generator stops after publishing the first element. Which means that as_completed
is not working as I thought it would work in def_get_all_pages
: Publishing the results of the completed coroutines and then passing it to the yield r["data"]["field"]
. line. How can I do this correctly?
Upvotes: 1
Views: 314
Reputation: 10946
Here is a test program I wrote. I took the code posted in the question and replaced the guts of the function "fetch" to return a dictionary. With this change I can actually run the program, and it works. I get one item in "results" for every 100 "pages."
import asyncio
async def fetch(_url, params = None):
if params is None:
return {"data": {"total": 169}}
return {"data": {"field" : str(params)}}
async def get_all_pages(base_url):
def paginate(size=10**6):
limit = 100
offset = 0
while offset <= size:
yield {"offset": offset, "limit": limit}
offset += limit
total = (await fetch(base_url))["data"]["total"] # total number of pages
coroutines = [fetch(base_url, params) for params in paginate(total)]
print("total number of pages: {}, total number of coroutines: {}".format(
total, len(coroutines)))
for routine in asyncio.as_completed(coroutines):
r = await routine
yield r["data"]["field"] #a field in the data for each page
async def main():
url = "http://arandomurl.com"
results = []
async for x in get_all_pages(url):
results.append(x)
print(results) #returns 1 -> only the first element is returned
asyncio.run(main())
Upvotes: 1