Abionics
Abionics

Reputation: 337

Python CancelledError with asyncio queue

I use code from this answer, but get asyncio.exceptions.CancelledError when queue is empty. In real project, I add tasks to queue from consumers, that's why I use while True statement

I compress that code to make debug more easily:

import asyncio
import traceback


async def consumer(queue: asyncio.Queue):
    try:
        while True:
            number = await queue.get()  # here is exception
            queue.task_done()
            print(f'consumed {number}')
    except BaseException:
        traceback.print_exc()


async def main():
    queue = asyncio.Queue()
    for i in range(3):
        await queue.put(i)
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(1)]
    await queue.join()
    for c in consumers:
        c.cancel()


asyncio.run(main())

And error:

consumed 0
consumed 1
consumed 2
Traceback (most recent call last):
  File "/Users/abionics/Downloads/BaseAsyncScraper/ttt.py", line 8, in consumer
    number = await queue.get()
  File "/usr/local/Cellar/[email protected]/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/queues.py", line 166, in get
    await getter
asyncio.exceptions.CancelledError

By the way, the documentation of queue.get() says that If queue is empty, wait until an item is available. What is the real reason of this error? Maybe there is a better solution?

Upvotes: 1

Views: 5385

Answers (1)

HTF
HTF

Reputation: 7270

The reason is because you cancel the task:

Task.cancel:

Request the Task to be cancelled.

This arranges for a CancelledError exception to be thrown into the wrapped coroutine on the next cycle of the event loop.

You have a few options to handle this:

1. Use asyncio.gather

If return_exceptions is True, exceptions are treated the same as successful results, and aggregated in the result list.

await queue.join()

for c in consumers:
    c.cancel()

await asyncio.gather(*consumers, return_exceptions=True)

2. Catch the exception in the consumer

async def consumer(q):
    while True:
        num = await q.get()

        try:                
            print(f"Working on: {num}")
        except asyncio.CancelledError:
            print(f"Exiting...")
            break
        finally:
            q.task_done()

3. Suppress the exception

from contextlib import suppress

async def consumer(q):
    with suppress(asyncio.CancelledError):
        while True:
            num = await q.get()
            print(f"Working on: {num}")
            q.task_done()

Upvotes: 5

Related Questions