Reputation: 46353
In the following code, a task A
is created and added to a set of tasks tasks
.
I then use await asyncio.wait(tasks)
to wait for the tasks to be finished.
But this does not take in consideration task B1
which is created inside task A
(recursive function call).
Then the following code doesn't wait for B1
to finish: see the result below, the task B1
never finishes.
I think the reason is that when tasks
is evaluated on line (**), it still has one single element.
Question: how to make await asyncio.wait(tasks)
work on an evolving/growing set of tasks?
import asyncio
tasks = set()
i = 0
async def mytask(s):
global i
print('mytask %s starting' % s)
await asyncio.sleep(1)
if i < 4: # limit number of tasks
print('mytask %s creating new task' % s)
i += 1
tasks.add(asyncio.create_task(mytask('B%i' % i)))
print('mytask %s len tasks:' % s, len(tasks))
await asyncio.sleep(0.5)
print('mystak %s finished' % s)
async def main():
print('main starting')
tasks.add(asyncio.create_task(mytask('A')))
print('len tasks:', len(tasks))
await asyncio.wait(tasks) # (**)
# await asyncio.sleep(10)
print('main finished')
asyncio.run(main())
Result:
main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting # <--- mytask B1 will never complete!
mystak A finished
main finished
If we replace line (**) by await asyncio.sleep(10)
, of course, all tasks will complete:
main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting
mystak A finished
mytask B1 creating new task
mytask B1 len tasks: 3
mytask B2 starting
mystak B1 finished
mytask B2 creating new task
mytask B2 len tasks: 4
mytask B3 starting
mystak B2 finished
mytask B3 creating new task
mytask B3 len tasks: 5
mytask B4 starting
mystak B3 finished
mytask B4 len tasks: 5
mystak B4 finished
main finished
Upvotes: 2
Views: 2924
Reputation: 12597
A asyncio.wait
is called with a list of one element, until its start/processing there is no B task. In your case the simplest solution is just to await the B task in the A, but A won't return until B is done.
If that doesn't suite you could use some kind of busy waiting - a infinite loop that checks tasks length:
import asyncio
tasks = set()
async def mytask(s):
print('mytask %s starting' % s)
await asyncio.sleep(1)
print('mytask %s create new task' % s)
tasks.add(asyncio.create_task(mytask('B')))
print('mytask %s len tasks:' % s, len(tasks))
await asyncio.sleep(0.5)
print('mystak %s finished' % s)
async def main():
print('main starting')
tasks.add(asyncio.create_task(mytask('A')))
print('len tasks:', len(tasks))
while True:
if all([task.done() for task in tasks]): break
await asyncio.wait(tasks)
print('main finished')
asyncio.run(main())
Keep in mind that busy waiting is often overused. Moreover it looks like implementing a task scheduler on top of the asyncio's task scheduler (which under the hood has while True
as well).
Another solution is to run_forever
the loop instead of asyncio.run
. It feels good for a worker long running applications.
asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()
You could also refactor the code to utilize asyncio.Queue
and await with Queue.join
to all items processed.
Upvotes: 1
Reputation: 155495
To start off with answering your question directly, you can wait for a dynamic set of tasks with a loop, such as:
while tasks:
prev_tasks = tasks.copy()
# use gather() so exceptions are propagated rather than discarded
await asyncio.gather(*tasks)
tasks.difference_update(prev_tasks)
But you probably don't need to do that. Instead, you can have each task wait for the subtask(s) it's created, along with its own work. That way you don't need to even have a global set of tasks, nor do you need to worry about waiting all of them in main()
:
import asyncio
i = 0
async def mytask(s):
global i
print('mytask %s starting' % s)
await asyncio.sleep(1)
if i < 4: # limit number of tasks
print('mytask %s creating new task' % s)
i += 1
task = asyncio.create_task(mytask('B%i' % i))
else:
task = None
print('mytask %s len tasks:' % s, i)
await asyncio.sleep(0.5) # our actual work
print('mystak %s finished' % s)
# after doing the work, wait for the child task if we created one
if task is not None:
await task
async def main():
print('main starting')
await mytask('A')
# await asyncio.sleep(10)
print('main finished')
asyncio.run(main())
Upvotes: 4