Reputation: 3
I am struggling to wrap my head around the asyncio library. I thought you could simply define the sections of your code you want to run asynchronous, but in all the examples I have seen, people tend to define their main function as asynchronous. Here is the code that I have written:
async def download_post(id, path):
print(f"Begin downloading {id}")
async with aiohttp.ClientSession() as session:
async with session.get(f"{apiurl}item/{id}.json?print=pretty") as resp:
json = await resp.json()
content = await resp.text()
print(f"Done downloading {id}")
if json["type"] == "story":
print(f"Begin writing to {id}.json")
with open(os.path.join(path, f"{id}.json"), "w") as file:
file.write(content)
print(f"Done writing to {id}.json")
async def update_posts(path):
myid = get_myid(path)
if myid < maxid: # Database can be updated
for id in range(myid+1, maxid):
await download_post(id, path)
def main():
if not os.path.exists(posts_dir):
os.makedirs(posts_dir)
path = os.path.join(os.getcwd(), posts_dir)
loop = asyncio.get_event_loop()
loop.run_until_complete(update_posts(path))
loop.close()
#domain_counts(path)
if __name__ == '__main__':
main()
The key here is that range(myid+1, maxid):
is very large, and requests.get()
takes a relatively long time. However, after trying to switch from requests to aiohttp with asyncio, I am still getting responces one-by-one as shown in the output below
Begin downloading 1
Done downloading 1
Begin writing to 1.json
Done writing to 1.json
Begin downloading 2
Done downloading 2
Begin writing to 2.json
Done writing to 2.json
Begin downloading 3
Done downloading 3
Begin writing to 3.json
Done writing to 3.json
Begin downloading 4
Done downloading 4
Begin writing to 4.json
Done writing to 4.json
I thought about splitting the download and write to file code into different functions, but then I'm not sure whether both of those would have to be async as well. Also, I think I would have to make many of the variables start with await
. These are the resources I have referencing:
Does anyone have some good resources I good go over to better understand what I am doing wrong? I've noticed many examples use asyncio.gather()
, but I don't really understand how it's used. Do I need to put async
in-front of every function/with
and await
in-front of every variable?
Upvotes: 0
Views: 1591
Reputation: 1669
Couple of important things:
asyncio
comes to your rescue.In your case, update_posts()
doesnt really seem like an async method in an ideal sense; because this method is technically only used to figure out which posts are to be downloaded and written
And since we are already discussing about download and writing, you can notice that you can actually make them run as independent tasks so ensure minimal downtime.
Here is how I might approach this:
import asyncio
from asyncio import Queue
import aiohttp
import os
async def generate_download_post_tasks(path, queue: Queue):
myid = get_myid(path)
if myid < maxid: # Database can be updated
for id in range(myid+1, maxid):
queue.put_nowait((id, path))
async def download_post_tasks(download_queue: Queue, write_queue: Queue):
async with aiohttp.ClientSession() as session:
while True:
download_request_id, path = await download_queue.get()
async with session.get(f"{apiurl}item/{download_request_id}.json?print=pretty") as resp:
json = await resp.json()
content = await resp.text()
print(f"Done downloading {download_request_id}")
if json["type"] == "story":
write_queue.put_nowait((download_request_id, content, path))
async def write_post_tasks(write_queue: Queue):
while True:
post_id, post_content, path = await write_queue.get()
print(f"Begin writing to {post_id}.json")
with open(os.path.join(path, f"{post_id}.json"), "w") as file:
file.write(post_content)
print(f"Done writing to {post_id}.json")
async def async_main():
if not os.path.exists(posts_dir):
os.makedirs(posts_dir)
path = os.path.join(os.getcwd(), posts_dir)
tasks = set()
download_queue = Queue()
write_queue = Queue()
tasks.add(asyncio.create_task(generate_download_post_tasks(path=path, queue=download_queue)))
tasks.add(asyncio.create_task(download_post_tasks(download_queue=download_queue, write_queue=write_queue)))
tasks.add(asyncio.create_task(write_post_tasks(write_queue=write_queue)))
wait_time = 100
try:
await asyncio.wait_for(asyncio.gather(*tasks), wait_time)
except:
# Catch errors
print("End!!")
if __name__ == '__main__':
asyncio.run(async_main())
Upvotes: 1