Basj
Basj

Reputation: 46353

How to use asyncio.wait on an growing set of tasks?

In the following code, a task A is created and added to a set of tasks tasks.
I then use await asyncio.wait(tasks) to wait for the tasks to be finished.

But this does not take in consideration task B1 which is created inside task A (recursive function call).

Then the following code doesn't wait for B1 to finish: see the result below, the task B1 never finishes.

I think the reason is that when tasks is evaluated on line (**), it still has one single element.

Question: how to make await asyncio.wait(tasks) work on an evolving/growing set of tasks?

import asyncio

tasks = set()
i = 0

async def mytask(s):
    global i
    print('mytask %s starting' % s)
    await asyncio.sleep(1)
    if i < 4:    # limit number of tasks
        print('mytask %s creating new task' % s)
        i += 1
        tasks.add(asyncio.create_task(mytask('B%i' % i)))
    print('mytask %s len tasks:' % s, len(tasks))
    await asyncio.sleep(0.5)
    print('mystak %s finished' % s)

async def main():
    print('main starting')
    tasks.add(asyncio.create_task(mytask('A')))
    print('len tasks:', len(tasks))
    await asyncio.wait(tasks)            # (**)
    # await asyncio.sleep(10)
    print('main finished')

asyncio.run(main())

Result:

main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting       # <--- mytask B1 will never complete!
mystak A finished
main finished

If we replace line (**) by await asyncio.sleep(10), of course, all tasks will complete:

main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting
mystak A finished
mytask B1 creating new task
mytask B1 len tasks: 3
mytask B2 starting
mystak B1 finished
mytask B2 creating new task
mytask B2 len tasks: 4
mytask B3 starting
mystak B2 finished
mytask B3 creating new task
mytask B3 len tasks: 5
mytask B4 starting
mystak B3 finished
mytask B4 len tasks: 5
mystak B4 finished
main finished

Upvotes: 2

Views: 2924

Answers (2)

kwarunek
kwarunek

Reputation: 12597

A asyncio.wait is called with a list of one element, until its start/processing there is no B task. In your case the simplest solution is just to await the B task in the A, but A won't return until B is done.

If that doesn't suite you could use some kind of busy waiting - a infinite loop that checks tasks length:

import asyncio

tasks = set()


async def mytask(s):
    print('mytask %s starting' % s)
    await asyncio.sleep(1)
    print('mytask %s create new task' % s)
    tasks.add(asyncio.create_task(mytask('B')))
    print('mytask %s len tasks:' % s, len(tasks))
    await asyncio.sleep(0.5)
    print('mystak %s finished' % s)

async def main():
    print('main starting')
    tasks.add(asyncio.create_task(mytask('A')))
    print('len tasks:', len(tasks))
    while True:
        if all([task.done() for task in tasks]): break
        await asyncio.wait(tasks)
    print('main finished')

asyncio.run(main())

Keep in mind that busy waiting is often overused. Moreover it looks like implementing a task scheduler on top of the asyncio's task scheduler (which under the hood has while True as well).

Another solution is to run_forever the loop instead of asyncio.run. It feels good for a worker long running applications.

asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()

You could also refactor the code to utilize asyncio.Queue and await with Queue.join to all items processed.

Upvotes: 1

user4815162342
user4815162342

Reputation: 155495

To start off with answering your question directly, you can wait for a dynamic set of tasks with a loop, such as:

while tasks:
    prev_tasks = tasks.copy()
    # use gather() so exceptions are propagated rather than discarded
    await asyncio.gather(*tasks)
    tasks.difference_update(prev_tasks)

But you probably don't need to do that. Instead, you can have each task wait for the subtask(s) it's created, along with its own work. That way you don't need to even have a global set of tasks, nor do you need to worry about waiting all of them in main():

import asyncio

i = 0

async def mytask(s):
    global i
    print('mytask %s starting' % s)
    await asyncio.sleep(1)
    if i < 4:    # limit number of tasks
        print('mytask %s creating new task' % s)
        i += 1
        task = asyncio.create_task(mytask('B%i' % i))
    else:
        task = None
    print('mytask %s len tasks:' % s, i)
    await asyncio.sleep(0.5)   # our actual work
    print('mystak %s finished' % s)
    # after doing the work, wait for the child task if we created one
    if task is not None:
        await task

async def main():
    print('main starting')
    await mytask('A')
    # await asyncio.sleep(10)
    print('main finished')

asyncio.run(main())

Upvotes: 4

Related Questions