bluppfisk
bluppfisk

Reputation: 2652

Asyncio: cancelling tasks and starting new ones when a signal flag is raised

My program is supposed to read data forever from provider classes stored in PROVIDERS, defined in the config. Every second, it should check whether the config has changed and if so, stop all tasks, reload the config and and create new tasks.

The below code raises CancelledError because I'm cancelling my tasks. Should I really try/catch each of those to achieve my goals or is there a better pattern?

async def main(config_file):
    load_config(config_file)

    tasks = []
    config_task = asyncio.create_task(watch_config(config_file))  # checks every 1s if config changed and raises ConfigChangedSignal if so

    tasks.append(config_task)
    for asset_name, provider in PROVIDERS.items():
        task = asyncio.create_task(provider.read_forever())
        tasks.append(task)

    try:
        await asyncio.gather(*tasks, return_exceptions=False)
    except ConfigChangedSignal:
        # Restarting
        for task in asyncio.tasks.all_tasks():
            task.cancel()  # raises CancelledError
        await main(config_file)


try:
    asyncio.run(main(config_file))
except KeyboardInterrupt:
    logger.debug("Ctrl-C pressed. Aborting")

Upvotes: 1

Views: 526

Answers (2)

fancidev
fancidev

Reputation: 154

jsbueno’s answer is appropriate.

An easy alternative is to enclose the entire event loop in an outer “while”:

async def main(config_file):
    load_config(config_file)

    tasks = []
    for asset_name, provider in PROVIDERS.items():
        task = asyncio.create_task(provider.read_forever())
        tasks.append(task)

    try:
        await watch_config(config_file)
    except ConfigChangedSignal:
        pass

try:
    while True:
        asyncio.run(main(config_file))
except KeyboardInterrupt:
    logger.debug("Ctrl-C pressed. Aborting")

Upvotes: 0

jsbueno
jsbueno

Reputation: 110261

If you are on Python 3.11, your pattern maps directly to using asyncio.TaskGroup, the "successor" to asyncio.gather, which makes use of the new "exception Groups". By default, if any task in the group raises an exception, all tasks in the group are cancelled:

I played around this snippet in the ipython console, and had run asyncio.run(main(False)) for no exception and asyncio.run(main(True)) for inducing an exception just to check the results:

import asyncio

async def doit(i, n, cancel=False):
    await asyncio.sleep(n)
    if cancel:
        raise RuntimeError()
    print(i, "done")

async def main(cancel):
  try:
      async with asyncio.TaskGroup() as group:
          tasks = [group.create_task(doit(i, 2)) for i in range(10)]
          group.create_task(doit(42, 1, cancel=cancel))
          group.create_task(doit(11, .5))
  except Exception:
      pass
  await asyncio.sleep(3)

Your code can acommodate that - Apart from the best practice for cancelling tasks, though, you are doing a recursive call to your main that, although will work for most practical purposes, can make seasoned developers go "sigh" - and also can break in edgecases, (it will fail after ~1000 cycles, for example), and leak resources.

The correct way to do that is assembling a while loop, since Python function calls, even tail calls, won't clean up the resources in the calling scope:

import asyncio
...


async def main(config_file):
    while True:
        load_config(config_file)
        try:
            async with asyncio.TaskGroup() as tasks:
                tasks.create_task(watch_config(config_file))  # checks every 1s if config changed and raises ConfigChangedSignal if so

                for asset_name, provider in PROVIDERS.items():
                tasks.create_task.create_task(provider.read_forever())

            # all tasks are awaited at the end of the with block
        except *ConfigChangedSignal:  # <- the new syntax in Python 3.11
            # Restarting is just a matter of re-doing the while-loop
            # ... log.info("config changed")
            pass

        # any other exception won't be caught and will error, allowing one
        # to review what went wrong
        
...


For Python 3.10, looping over the tasks and cancelling each seems alright, but you should look at that recursive call. If you don't want a while-loop inside your current main, refactor the code so that main itself is called from an outter while-loop



async def main(config_file):
    while True:
        await inner_main(config_file)

async def inner_main(config_file):
    load_config(config_file)

    # keep the existing body
    ...
    except ConfigChangedSignal:
        # Restarting
        for task in asyncio.tasks.all_tasks():
            task.cancel()  # raises CancelledError
       # await main call dropped from here


Upvotes: 2

Related Questions