Superbman
Superbman

Reputation: 861

Python asyncio queue not showing any exceptions

  1. If i run this code, it will hang without throwing ZeroDivisionError.
  2. If i move await asyncio.gather(*tasks, return_exceptions=True) above await queue.join(), it will finally throw ZeroDivisionError and stop.
  3. If i then comment out 1 / 0 and run, it will execute everything, but will hang in the end.

Now the question is, how can i achive both:

  1. Being able to see unexpected exceptions as in the case 2 above, and...
  2. Actually stop when all task are done in the Queue

.

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        print('Get a "work item" out of the queue.')
        sleep_for = await queue.get()

        print('Sleep for the "sleep_for" seconds.')
        await asyncio.sleep(sleep_for)

        # Error on purpose
        1 / 0

        print('Notify the queue that the "work item" has been processed.')
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')

async def main():
    print('Create a queue that we will use to store our "workload".')
    queue = asyncio.Queue()

    print('Generate random timings and put them into the queue.')
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    print('Create three worker tasks to process the queue concurrently.')
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    print('Wait until the queue is fully processed.')
    started_at = time.monotonic()

    print('Joining queue')
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    print('Cancel our worker tasks.')
    for task in tasks:
        task.cancel()

    print('Wait until all worker tasks are cancelled.')
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

asyncio.run(main())

Upvotes: 6

Views: 2034

Answers (2)

WcW
WcW

Reputation: 511

A workaround (but ugly) solution: add try-catch block inside async def worker(...):, this will catch any exception in the code and prevent a no-ending loop.

Follow the same code as the question:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        try:
            ...
            1 / 0  # Error code
            ...
        except Exception as e:
            print(e)  # Show error
        finanlly:
            queue.task_done() # Make sure to clear the task

async def main():
     ...

asyncio.run(main())

Upvotes: 0

user4815162342
user4815162342

Reputation: 154856

There are several ways to approach this, but the central idea is that in asyncio, unlike in classic threading, it is straightforward to await multiple things at once.

For example, you can await queue.join() and the worker tasks, whichever completes first. Since workers don't complete normally (you cancel them later), a worker completing means that it has raised.

# convert queue.join() to a full-fledged task, so we can test
# whether it's done
queue_complete = asyncio.create_task(queue.join())

# wait for the queue to complete or one of the workers to exit
await asyncio.wait([queue_complete, *tasks], return_when=asyncio.FIRST_COMPLETED)

if not queue_complete.done():
    # If the queue hasn't completed, it means one of the workers has
    # raised - find it and propagate the exception.  You can also
    # use t.exception() to get the exception object. Canceling other
    # tasks is another possibility.
    for t in tasks:
        if t.done():
            t.result()  # this will raise

Upvotes: 7

Related Questions