Reputation: 861
ZeroDivisionError
. await asyncio.gather(*tasks, return_exceptions=True)
above await queue.join()
, it will finally throw ZeroDivisionError
and stop.1 / 0
and run, it will execute everything, but will hang in the end.Now the question is, how can i achive both:
.
import asyncio
import random
import time
async def worker(name, queue):
while True:
print('Get a "work item" out of the queue.')
sleep_for = await queue.get()
print('Sleep for the "sleep_for" seconds.')
await asyncio.sleep(sleep_for)
# Error on purpose
1 / 0
print('Notify the queue that the "work item" has been processed.')
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
print('Create a queue that we will use to store our "workload".')
queue = asyncio.Queue()
print('Generate random timings and put them into the queue.')
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
print('Create three worker tasks to process the queue concurrently.')
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
print('Wait until the queue is fully processed.')
started_at = time.monotonic()
print('Joining queue')
await queue.join()
total_slept_for = time.monotonic() - started_at
print('Cancel our worker tasks.')
for task in tasks:
task.cancel()
print('Wait until all worker tasks are cancelled.')
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
Upvotes: 6
Views: 2034
Reputation: 511
A workaround (but ugly) solution: add try-catch block inside async def worker(...):
, this will catch any exception in the code and prevent a no-ending loop.
Follow the same code as the question:
import asyncio
import random
import time
async def worker(name, queue):
while True:
try:
...
1 / 0 # Error code
...
except Exception as e:
print(e) # Show error
finanlly:
queue.task_done() # Make sure to clear the task
async def main():
...
asyncio.run(main())
Upvotes: 0
Reputation: 154856
There are several ways to approach this, but the central idea is that in asyncio, unlike in classic threading, it is straightforward to await multiple things at once.
For example, you can await queue.join()
and the worker tasks, whichever completes first. Since workers don't complete normally (you cancel them later), a worker completing means that it has raised.
# convert queue.join() to a full-fledged task, so we can test
# whether it's done
queue_complete = asyncio.create_task(queue.join())
# wait for the queue to complete or one of the workers to exit
await asyncio.wait([queue_complete, *tasks], return_when=asyncio.FIRST_COMPLETED)
if not queue_complete.done():
# If the queue hasn't completed, it means one of the workers has
# raised - find it and propagate the exception. You can also
# use t.exception() to get the exception object. Canceling other
# tasks is another possibility.
for t in tasks:
if t.done():
t.result() # this will raise
Upvotes: 7