Evgen
Evgen

Reputation: 397

Sequential version of asyncio.gather

I tried to create a method similar to asyncio.gather, but which will execute the list of tasks sequentially and not asynchronously:

async def in_sequence(*tasks):
    """Executes tasks in sequence"""
    for task in tasks:
        await task

Next this method was supposed to be used like this:

async def some_work(work_name):
    """Do some work"""
    print(f"Start {work_name}")
    await asyncio.sleep(1)
    if raise_exception:
        raise RuntimeError(f"{work_name} raise an exception")
    print(f"Finish {work_name}")

async def main():
    try:
        await asyncio.gather(
            some_work("work1"),         # work1, work2, in_sequence and work5 executed in concurrently
            some_work("work2"),
            in_sequence(
                some_work("work3"),     # work3 and work4 executed in sequence
                some_work("work4")
            ),
            some_work("work5"),


    except RuntimeError as error:
        print(error)                    # raise an exception at any point to terminate

And everything worked fine until I tried to throw an exception in some_work:

async def main():
    try:
        await asyncio.gather(
            some_work("work1"),
            some_work("work2"),
            in_sequence(
                some_work("work3", raise_exception=True),       # raise an exception here
                some_work("work4")
            ),
            some_work("work5"),


    except RuntimeError as error:
        print(error)

Immediately after that, I received the following error message:

RuntimeWarning: coroutine 'some_work' was never awaited

I read the documentation and continued to experiment:

async def in_sequence(*tasks):
    """Executes tasks in sequence"""
    _tasks = []
    for task in tasks:
        _tasks.append(asyncio.create_task(task))

    for _task in _tasks:
        await _task

And this version worked as expected!

In this regard, I have next questions:

  1. Why does the second version work and the first not?
  2. Does asyncio already have the tools to execute the list of tasks sequentially?
  3. Have I chosen the right implementation method or are there better options?

Upvotes: 1

Views: 3672

Answers (4)

Break
Break

Reputation: 342

Taking inspiration from user4815162342's and Anton Pomieshchenko's solutions, I came up with this variation of it:

async def in_sequence(*storm):
    twister = iter(storm)
    for task in twister:
        task = task() # if it's a regular function, it's done here.
        if inspect.isawaitable(task):
            try:
                await task # if it's also awaitable, await it
            except BaseException as e:
                task.throw(e) # if an error occurs, throw it into the coroutine
            finally:
                task.close() # to ensure coroutine closer

    assert not any(twister) # optionally verify that the iterator is now empty

this way you can combine regular functions with coroutines with this in_sequence. But make sure to call it like this:

await in_sequence(*[b.despawn, b.release])

Notice the lack of () (__call__()), because otherwise the regular function would be called immediately and the coroutine would throw a RuntimeWarning for having never been awaited. ( b.despawn is a coroutine and b.release is not for my example)

You can also do an additional check for callable(task) just before invoking task(), but it's up to you.

Upvotes: 1

user4815162342
user4815162342

Reputation: 155016

And this version worked as expected!

The problem with the second version is that it doesn't actually run the coroutines sequentially, it runs them in parallel. This is because asyncio.create_task() schedules the coroutine to run in parallel with the current coroutines. So when you await tasks in a loop, you are actually allowing all the tasks to run while awaiting the first one. Despite appearances, the whole loop will run for only as long as the longest task. (See here for more details.)

The warning displayed by your first version is intended to prevent you from accidentally creating a coroutine that you never await, e.g. writing just asyncio.sleep(1) instead of await asyncio.sleep(1). As far as asyncio is concerned, main is instantiating coroutine objects and passing them to in_sequence which "forgets" to await some of them.

One way to suppress the warning message is to allow the coroutine to spin, but cancel it immediately. For example:

async def in_sequence(*coros):
    remaining = iter(coros)
    for coro in remaining:
        try:
            await coro
        except Exception:
            for c in remaining:
                asyncio.create_task(c).cancel()
            raise

Note that a variable name that begins with an underscore marks an unused variable, so you shouldn't name variables so if you actually do use them.

Upvotes: 1

Anton Pomieshchenko
Anton Pomieshchenko

Reputation: 2167

You said that version of in_sequence works(with asyncio.create_task), but I think it does not. From docs

Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.

It seems that it runs coroutines in parallel, but you need them in sequence.

So experimented and found two ways how to fix this

Use your original in_sequence function and add this code, that hides that error:

import warnings
warnings.filterwarnings(
    'ignore',
    message=r'^coroutine .* was never awaited$',
    category=RuntimeWarning
)

Fix in_sequence function, like this:

async def in_sequence(*tasks):
    for index, task in enumerate(tasks):
        try:
            await task
        except Exception as e:
            for task in tasks[index + 1:]:
                task.close()
            raise e

Answers on other questions:

  1. That warnings is triggered by C++ code, when you do not have links on coroutine. just simple code can show you this idea(in terminal):

async def test():
    return 1

f = test()
f = None # after that you will get that error
  1. I do not know
  2. See above

Upvotes: 2

Artemij Rodionov
Artemij Rodionov

Reputation: 1826

  1. The first version doesn't work because in_sequence doesn't catch an exception which can be raised on await task. The second works because create_task creates a future-like Task object that runs coroutine. The object doesn't return/propagate a result of the wrapped coroutine. When you await the object, it suspends until either has a result or an exception set or until it is canceled.

  2. It seems it hasn't.

  3. The second version will execute passed coroutines concurrently, so it is incorrect implementation. If you really want to use some in_sequence function you can:
    • Somehow delay the creation of coroutines.
    • Group sequential execution in an async function

e.g.:

async def in_sequence(*fn_and_args):
    for fn, args, kwargs in fn_and_args:
        await fn(*args, **kwargs)  # create a coro and await it in place

in_sequence(
    (some_work, ("work3",), {'raise_exception': True}),
    (some_work, ("work4",), {}),
)
async def in_sequence():
    await some_work("work3", raise_exception=True)
    await some_work("work4")

Upvotes: 1

Related Questions