Reputation: 397
I tried to create a method similar to asyncio.gather, but which will execute the list of tasks sequentially and not asynchronously:
async def in_sequence(*tasks):
"""Executes tasks in sequence"""
for task in tasks:
await task
Next this method was supposed to be used like this:
async def some_work(work_name):
"""Do some work"""
print(f"Start {work_name}")
await asyncio.sleep(1)
if raise_exception:
raise RuntimeError(f"{work_name} raise an exception")
print(f"Finish {work_name}")
async def main():
try:
await asyncio.gather(
some_work("work1"), # work1, work2, in_sequence and work5 executed in concurrently
some_work("work2"),
in_sequence(
some_work("work3"), # work3 and work4 executed in sequence
some_work("work4")
),
some_work("work5"),
except RuntimeError as error:
print(error) # raise an exception at any point to terminate
And everything worked fine until I tried to throw an exception in some_work:
async def main():
try:
await asyncio.gather(
some_work("work1"),
some_work("work2"),
in_sequence(
some_work("work3", raise_exception=True), # raise an exception here
some_work("work4")
),
some_work("work5"),
except RuntimeError as error:
print(error)
Immediately after that, I received the following error message:
RuntimeWarning: coroutine 'some_work' was never awaited
I read the documentation and continued to experiment:
async def in_sequence(*tasks):
"""Executes tasks in sequence"""
_tasks = []
for task in tasks:
_tasks.append(asyncio.create_task(task))
for _task in _tasks:
await _task
And this version worked as expected!
In this regard, I have next questions:
Upvotes: 1
Views: 3672
Reputation: 342
Taking inspiration from user4815162342's and Anton Pomieshchenko's solutions, I came up with this variation of it:
async def in_sequence(*storm):
twister = iter(storm)
for task in twister:
task = task() # if it's a regular function, it's done here.
if inspect.isawaitable(task):
try:
await task # if it's also awaitable, await it
except BaseException as e:
task.throw(e) # if an error occurs, throw it into the coroutine
finally:
task.close() # to ensure coroutine closer
assert not any(twister) # optionally verify that the iterator is now empty
this way you can combine regular functions with coroutines with this in_sequence
. But make sure to call it like this:
await in_sequence(*[b.despawn, b.release])
Notice the lack of ()
(__call__()
), because otherwise the regular function would be called immediately and the coroutine would throw a RuntimeWarning
for having never been awaited. ( b.despawn
is a coroutine and b.release
is not for my example)
You can also do an additional check for callable(task)
just before invoking task()
, but it's up to you.
Upvotes: 1
Reputation: 155016
And this version worked as expected!
The problem with the second version is that it doesn't actually run the coroutines sequentially, it runs them in parallel. This is because asyncio.create_task()
schedules the coroutine to run in parallel with the current coroutines. So when you await tasks in a loop, you are actually allowing all the tasks to run while awaiting the first one. Despite appearances, the whole loop will run for only as long as the longest task. (See here for more details.)
The warning displayed by your first version is intended to prevent you from accidentally creating a coroutine that you never await, e.g. writing just asyncio.sleep(1)
instead of await asyncio.sleep(1)
. As far as asyncio is concerned, main
is instantiating coroutine objects and passing them to in_sequence
which "forgets" to await some of them.
One way to suppress the warning message is to allow the coroutine to spin, but cancel it immediately. For example:
async def in_sequence(*coros):
remaining = iter(coros)
for coro in remaining:
try:
await coro
except Exception:
for c in remaining:
asyncio.create_task(c).cancel()
raise
Note that a variable name that begins with an underscore marks an unused variable, so you shouldn't name variables so if you actually do use them.
Upvotes: 1
Reputation: 2167
You said that version of in_sequence works(with asyncio.create_task), but I think it does not. From docs
Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.
It seems that it runs coroutines in parallel, but you need them in sequence.
So experimented and found two ways how to fix this
Use your original in_sequence function and add this code, that hides that error:
import warnings
warnings.filterwarnings(
'ignore',
message=r'^coroutine .* was never awaited$',
category=RuntimeWarning
)
Fix in_sequence function, like this:
async def in_sequence(*tasks):
for index, task in enumerate(tasks):
try:
await task
except Exception as e:
for task in tasks[index + 1:]:
task.close()
raise e
Answers on other questions:
async def test():
return 1
f = test()
f = None # after that you will get that error
Upvotes: 2
Reputation: 1826
The first version doesn't work because in_sequence
doesn't catch an exception which can be raised on await task
. The second works because create_task
creates a future-like Task object that runs coroutine. The object doesn't return/propagate a result of the wrapped coroutine.
When you await
the object, it suspends until either has a result or an exception set or until it is canceled.
It seems it hasn't.
in_sequence
function you can:
async
functione.g.:
async def in_sequence(*fn_and_args):
for fn, args, kwargs in fn_and_args:
await fn(*args, **kwargs) # create a coro and await it in place
in_sequence(
(some_work, ("work3",), {'raise_exception': True}),
(some_work, ("work4",), {}),
)
async def in_sequence():
await some_work("work3", raise_exception=True)
await some_work("work4")
Upvotes: 1