Reputation: 890
The documentation for asyncio.gather says that
If return_exceptions is
False
(default), the first raised exception is immediately propagated to the task that awaits ongather()
. Other awaitables in the aws sequence won’t be cancelled and will continue to run.
However, from a simple test it seems that if one of the tasks raises an exception when return_exceptions is False
, all other awaitable are cancelled (or to be more precise, in case the terminology is not clear to me, the other awaitables do not finish their job):
import asyncio
async def factorial(name, number, raise_exception=False):
# If raise_exception is True, will raise an exception when
# the loop counter > 3
f = 1
for i in range(2, number + 1):
print(f' Task {name}: Compute factorial({i})...')
if raise_exception and i > 3:
print(f' Task {name}: raising Exception')
raise Exception(f'Bad Task {name}')
await asyncio.sleep(1)
f *= i
print(f'==>> Task {name} DONE: factorial({number}) = {f}')
return f
async def main():
tasks = [factorial('A', 5), # this will not be finished
factorial('B', 10, raise_exception=True),
factorial('C', 2)]
try:
results = await asyncio.gather(*tasks)
print('Results:', results)
except Exception as e:
print('Got an exception:', e)
asyncio.run(main())
What this piece of code is doing, just to make it simpler, it defines 3 tasks and call asyncio.gather()
on them. One of the tasks raises an exception before one of the others is done, and this other task is not finished.
Actually, I cannot even make sense with what the documentations says - if an exception is raised and caught by the task awaiting on gather
, I would not even be able to get the returned results (even if the other task would, somehow, get done).
Am I missing anything, or is there a problem with the documentation?
This was tested with Python 3.7.2.
Upvotes: 47
Views: 45062
Reputation: 6789
The gather
function is primarily designed for the Scatter-Gather pattern. In this pattern, when you need to calculate something from all task-results (e.g. in an aggregation function), the gather function is handy. The switch return_exceptions
controls whether a single failure should invalidate the aggregated result.
By default, any failure will be immediately propagated to the gather task.
This behavior is desired in aggregation tasks like sum(await gather(*tasks))
because the sum becomes invalid due to a single failure.
However, no task will be cancelled or altered in any way by gather
.
It is merely one observable, waiting and gathering information quietly.
Your question 1:
I cannot even make sense with what the documentations says - if an exception is raised ..., I would not even be able to get the returned results
Usually you can and need to define multiple observables
in a Scatter-Gather job.
After one gather fails, the main routine should continue with another gather
or wait
or as_completed
or asyncio.sleep
.
Remember that tasks are still running and the Future objects are still there. By design, Scatter-Gather tasks should be independent, and the gather operation should not have any side effects, so that you can continue to handle those awaitables independently, e.g., by querying their tasks[i].result()
or explicitly killing them.
Question 2:
Am I missing anything, or is there a problem with the documentation?
In your test program, no other tasks (observables) are defined after catching the exception, so the main program simply exits, which gives you an illusion that all tasks are cancelled.
They are indeed cancelled when the event loop are closed by asyncio.run(main())
, some milliseconds after catching the exception.
By adding another waiting task, either await asyncio.wait(tasks)
or simply await asyncio.sleep(20)
at the end of main()
, those worker tasks will have their chance to complete. Thus the documentation is correct.
Your test program can be considered as a DAG of calculation tasks. The gather task is your root target passed to asyncio.run
. Therefore, when the only mission fails, all sub-tasks are aborted.
-- updated in 2023
original answer
I've run your code and got the following output, as expected from documentation.
Task C: Compute factorial(2)...
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
==>> Task C DONE: factorial(2) = 2
Task A: Compute factorial(3)...
Task B: Compute factorial(3)...
Task A: Compute factorial(4)...
Task B: Compute factorial(4)...
Task B: raising Exception
Got an exception: Bad Task B
Task A: Compute factorial(5)...
==>> Task A DONE: factorial(5) = 120
await asyncio.gater()
returns immediately and print('Got an exception:', e)
to the screen.As @deceze commented,
your program exited immediately after the exception was caught and main()
returns. Thus, the tasks A and C are terminated because the entire process dies, not because of cancellation.
To fix it, add await asyncio.sleep(20)
to the end of the main()
function.
Upvotes: 30
Reputation: 14360
The answer to the main question here is to use asyncio.as_complete. Change your main()
function code to:
async def main():
tasks = [factorial('A', 5), # this will not be finished
factorial('B', 10, raise_exception=True),
factorial('C', 2)]
# Handle results in the order the task are completed
# if exeption you can handle that as well.
for coroutine in asyncio.as_completed(tasks):
try:
results = await coroutine
except Exception as e:
print('Got an exception:', e)
else:
print('Results:', results)
Upvotes: 6