Reputation: 1106
I want for asyncio.gather
to immediately raise any exception except for some particular exception class, which should be instead returned in the results list. Right now, I just slightly modified the canonical implementation of asyncio.gather
in CPython and use that, but I wonder if there is not a more canonical way to do it.
Upvotes: 2
Views: 1337
Reputation: 155296
You can implement such semantics using the more powerful asyncio.wait
primitive and its return_when=asyncio.FIRST_EXCEPTION
option:
async def xgather(*coros, allowed_exc):
results = {}
pending = futures = list(map(asyncio.ensure_future, coros))
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_EXCEPTION)
for fut in done:
try:
results[fut] = fut.result()
except allowed_exc as e:
results[fut] = e
return [results[fut] for fut in futures]
The idea is to call wait
until either all futures are done or an exception is observed. The exception is in turn either stored or propagated, depending on whether it matches allowed_exc
. If all the results and allowed exceptions have been successfully collected, they are returned in the correct order, as with asyncio.gather
.
The approach of modifying the implementation of asyncio.gather
might easily fail on a newer Python version, since the code accesses private attributes of Future
objects. Also, alternative event loops like uvloop
could make their gather
and wait
more efficient, which would automatically benefit an xgather
based on the public API.
Test code:
import asyncio
async def fail():
1/0
async def main():
print(await xgather(asyncio.sleep(1), fail(), allowed_exc=OSError))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
When run, the code raises immediately, which is expected ZeroDivisionError
doesn't match the allowed OSError
exception. Changing OSError
to ZeroDivisionError
causes the code to sleep for 1 second and output [None, ZeroDivisionError('division by zero',)]
.
Upvotes: 2