Sushant
Sushant

Reputation: 3669

Asyncio task cancellation

This is a test script I created to better understand task cancellation -

import asyncio
import random
import signal
import traceback

async def shutdown(signame, loop):
    print("Shutting down")
    tasks = [task for task in asyncio.Task.all_tasks()]
    for task in tasks:
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            print("Task cancelled: %s", task)   
    loop.stop()

async def another():
    await asyncio.sleep(2)

async def some_other_process():
    await asyncio.sleep(5)
    return "Me"

async def process(job, loop, i):
    print(i)
    task = loop.create_task(some_other_process())
    value = await task

    if i < 1:
        another_task = loop.create_task(another())
        await another_task
    # await some_other_process()

def pull(loop):
    i = 0
    while True:
        job = f"random-integer-{random.randint(0, 100)}"
        try:
            loop.run_until_complete(process(job, loop, i))
            i += 1
        except asyncio.CancelledError as e:
            print("Task cancelled")
            break
        except Exception:
            print(traceback.format_exc())
    # asyncio.get_event_loop().stop()       


def main():
    try:
        loop = asyncio.get_event_loop()

        for signame in ['SIGINT']:
            loop.add_signal_handler(
                getattr(signal, signame),
                lambda: asyncio.ensure_future(shutdown(signame, loop))
            )

        try:
            pull(loop)
        except Exception:
            print(traceback.format_exc())
        finally:
            loop.close()
    finally:
        print("Done")

if __name__ == "__main__":
    main()

And I can not understand why I see -

Task was destroyed but it is pending!
task: <Task cancelling coro=<shutdown() done, defined at test.py:6>>

Upvotes: 2

Views: 1906

Answers (1)

Mikhail Gerasimov
Mikhail Gerasimov

Reputation: 39536

loop.add_signal_handler(
    getattr(signal, signame),
    lambda: asyncio.ensure_future(shutdown(signame, loop))
)

Here using asyncio.ensure_future you create task for shutdown coroutine, but you don't await anywhere for this task to be finished. Later when you close event loop it warns you this task is pending.


Upd:

If you want to do some clenup, the best place for it is right before loop.close() regardless of reason your script ended (signal, exception, etc.)

Try to alter your code this way:

# ...

async def shutdown(loop):  # remove `signal` arg

# ...

def main():
    try:
        loop = asyncio.get_event_loop()
        try:
            pull(loop)
        except Exception:
            print(traceback.format_exc())
        finally:
            loop.run_until_complete(shutdown(loop))  # just run until shutdown is done
            loop.close()
    finally:
        print("Done")

# ...

Upd2:

In case you still want signal handler, you probably want to do something like this:

from functools import partial

loop.add_signal_handler(
    getattr(signal, signame),
    partial(cb, signame, loop)
)

def cb(signame, loop):
    loop.stop()
    loop.run_until_complete(shutdown(signame, loop))

Upvotes: 1

Related Questions