Reputation: 83
I'm trying to fetch some data from OpenSubtitles using asyncio and then download a file who's information is contained in that data. I want to fetch that data and download the file at the same time using asyncio.
The problem is that I want to wait for 1 task from the list tasks
to finish before commencing with the rest of the tasks in the list or the download_tasks
. The reason for this is that in self._perform_query()
I am writing information to a file and in self._download_and_save_file()
I am reading that same information from that file. So in other words, the download_tasks
need to wait for at least one task in tasks
to finish before starting.
I found out I can do that with asyncio.wait(return_when=FIRST_COMPLETED)
but for some reason it is not working properly:
payloads = [create_payloads(entry) for entry in retreive(table_in_database)]
tasks = [asyncio.create_task(self._perform_query(payload, proxy)) for payload in payloads]
download_tasks = [asyncio.create_task(self._download_and_save_file(url, proxy) for url in url_list]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(done)
print(len(done))
print(pending)
print(len(pending))
await asyncio.wait(download_tasks)
The output is completely different than expected. It seems that out of 3 tasks in the list tasks
all 3 of them are being completed despite me passing asyncio.FIRST_COMPLETED
. Why is this happening?
{<Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:\Users\...\subtitles.py:71> result=None>, <Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:\Users\...\subtitles.py:71> result=None>, <Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:\Users\...\subtitles.py:71> result=None>}
3
set()
0
Exiting
As far as I can tell, the code in self._perform_query()
shouldn't affect this problem. Here it is anyway just to make sure:
async def _perform_query(self, payload, proxy):
try:
query_result = proxy.SearchSubtitles(self.opensubs_token, [payload], {"limit": 25})
except Fault as e:
raise "A fault has occurred:\n{}".format(e)
except ProtocolError as e:
raise "A ProtocolError has occurred:\n{}".format(e)
else:
if query_result["status"] == "200 OK":
with open("dl_links.json", "w") as dl_links_json:
result = query_result["data"][0]
subtitle_name = result["SubFileName"]
download_link = result["SubDownloadLink"]
download_data = {"download link": download_link,
"file name": subtitle_name}
json.dump(download_data, dl_links_json)
else:
print("Wrong status code: {}".format(query_result["status"]))
For now, I've been testing this without running download_tasks
but I have included it here for context. Maybe I am going about this problem in a completely wrong manner. If so, I would much appreciate your input!
Edit:
The problem was very simple as answered below. _perform_query
wasn't an awaitable function, instead it ran synchronously. I changed that by editing the file writing part of _perform_query
to be asynchronous with aiofiles
:
def _perform_query(self, payload, proxy):
query_result = proxy.SearchSubtitles(self.opensubs_token, [payload], {"limit": 25})
if query_result["status"] == "200 OK":
async with aiofiles.open("dl_links.json", mode="w") as dl_links_json:
result = query_result["data"][0]
download_link = result["SubDownloadLink"]
await dl_links_json.write(download_link)
Upvotes: 0
Views: 2171
Reputation: 155046
return_when=FIRST_COMPLETED
doesn't guarantee that only a single task will complete. It guarantees that the wait will complete as soon as a task completes, but it is perfectly possible that other tasks complete "at the same time", which for asyncio means in the same iteration of the event loop. Consider, for example, the following code:
async def noop():
pass
async def main():
done, pending = await asyncio.wait(
[noop(), noop(), noop()], return_when=asyncio.FIRST_COMPLETED)
print(len(done), len(pending))
asyncio.run(main())
This prints 3 0
, just like your code. Why?
asyncio.wait
does two things: it submits the coroutines to the event loop, and it sets up callbacks to notify it when any of them is complete. However, the noop
coroutine doesn't contain an await
, so none of the calls to noop()
suspends, each just does its thing and immediately returns. As a result, all three coroutine instances finish within the same pass of the event loop. wait
is then informed that all three coroutines have finished, a fact it dutifully reports.
If you change noop
to await a random sleep, e.g. change pass
to await asyncio.sleep(0.1 * random.random())
, you get the expected behavior. With an await
the coroutines no longer complete at the same time, and wait
will report the first one as soon as it detects it.
This reveals the true underlying issue with your code: _perform_query
doesn't await. This indicates that you are not using an async underlying library, or that you are using it incorrectly. The call to SearchSubtitles
likely simply blocks the event loop, which appears to work in trivial tests, but breaks essential asyncio features such as concurrent execution of tasks.
Upvotes: 2