lukaabra
lukaabra

Reputation: 83

Why are all the tasks completed in asyncio.wait() when I clearly indicate that I want only the first one completed?

I'm trying to fetch some data from OpenSubtitles using asyncio and then download a file who's information is contained in that data. I want to fetch that data and download the file at the same time using asyncio.

The problem is that I want to wait for 1 task from the list tasks to finish before commencing with the rest of the tasks in the list or the download_tasks. The reason for this is that in self._perform_query() I am writing information to a file and in self._download_and_save_file() I am reading that same information from that file. So in other words, the download_tasks need to wait for at least one task in tasks to finish before starting.

I found out I can do that with asyncio.wait(return_when=FIRST_COMPLETED) but for some reason it is not working properly:

payloads = [create_payloads(entry) for entry in retreive(table_in_database)] 
tasks = [asyncio.create_task(self._perform_query(payload, proxy)) for payload in payloads]
download_tasks = [asyncio.create_task(self._download_and_save_file(url, proxy) for url in url_list]

done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(done)
print(len(done))
print(pending)
print(len(pending))
await asyncio.wait(download_tasks)

The output is completely different than expected. It seems that out of 3 tasks in the list tasks all 3 of them are being completed despite me passing asyncio.FIRST_COMPLETED. Why is this happening?

{<Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:\Users\...\subtitles.py:71> result=None>, <Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:\Users\...\subtitles.py:71> result=None>, <Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:\Users\...\subtitles.py:71> result=None>}
3
set()
0
Exiting

As far as I can tell, the code in self._perform_query() shouldn't affect this problem. Here it is anyway just to make sure:

async def _perform_query(self, payload, proxy):
    try:
        query_result = proxy.SearchSubtitles(self.opensubs_token, [payload], {"limit": 25})
    except Fault as e:
        raise "A fault has occurred:\n{}".format(e)
    except ProtocolError as e:
        raise "A ProtocolError has occurred:\n{}".format(e)
    else:
        if query_result["status"] == "200 OK":
            with open("dl_links.json", "w") as dl_links_json:
                result = query_result["data"][0]
                subtitle_name = result["SubFileName"]
                download_link = result["SubDownloadLink"]
                download_data = {"download link": download_link,
                                 "file name": subtitle_name}
                json.dump(download_data, dl_links_json)
        else:
            print("Wrong status code: {}".format(query_result["status"]))

For now, I've been testing this without running download_tasks but I have included it here for context. Maybe I am going about this problem in a completely wrong manner. If so, I would much appreciate your input!

Edit:

The problem was very simple as answered below. _perform_query wasn't an awaitable function, instead it ran synchronously. I changed that by editing the file writing part of _perform_query to be asynchronous with aiofiles:

def _perform_query(self, payload, proxy):
    query_result = proxy.SearchSubtitles(self.opensubs_token, [payload], {"limit": 25})
    if query_result["status"] == "200 OK":
        async with aiofiles.open("dl_links.json", mode="w") as dl_links_json:
            result = query_result["data"][0]
            download_link = result["SubDownloadLink"]
            await dl_links_json.write(download_link)

Upvotes: 0

Views: 2171

Answers (1)

user4815162342
user4815162342

Reputation: 155046

return_when=FIRST_COMPLETED doesn't guarantee that only a single task will complete. It guarantees that the wait will complete as soon as a task completes, but it is perfectly possible that other tasks complete "at the same time", which for asyncio means in the same iteration of the event loop. Consider, for example, the following code:

async def noop():
    pass

async def main():
    done, pending = await asyncio.wait(
        [noop(), noop(), noop()], return_when=asyncio.FIRST_COMPLETED)
    print(len(done), len(pending))

asyncio.run(main())

This prints 3 0, just like your code. Why?

asyncio.wait does two things: it submits the coroutines to the event loop, and it sets up callbacks to notify it when any of them is complete. However, the noop coroutine doesn't contain an await, so none of the calls to noop() suspends, each just does its thing and immediately returns. As a result, all three coroutine instances finish within the same pass of the event loop. wait is then informed that all three coroutines have finished, a fact it dutifully reports.

If you change noop to await a random sleep, e.g. change pass to await asyncio.sleep(0.1 * random.random()), you get the expected behavior. With an await the coroutines no longer complete at the same time, and wait will report the first one as soon as it detects it.

This reveals the true underlying issue with your code: _perform_query doesn't await. This indicates that you are not using an async underlying library, or that you are using it incorrectly. The call to SearchSubtitles likely simply blocks the event loop, which appears to work in trivial tests, but breaks essential asyncio features such as concurrent execution of tasks.

Upvotes: 2

Related Questions