Bunny
Bunny

Reputation: 342

python asyncio.create_task tasks exiting earlier than expected?

I have the following code:

import asyncio

async def myfunc(i):
    print("hello", i)
    await asyncio.sleep(i)
    print("world", i)

async def main():
    asyncio.create_task(myfunc(2))
    asyncio.create_task(myfunc(1))
    
asyncio.run(main())

It outputs:

hello 2
hello 1

Notice that world isn't printed anywhere. Why is the output we see being produced? I was expecting:

hello 2
hello 1
world 1
world 2

Because I thought that the asyncio.sleep(i) calls would yield execution to the event loop, at which point the event loop would reschedule them after their respective wait times. Clearly I am misunderstanding. Can someone explain?

Upvotes: 0

Views: 1440

Answers (2)

Bunny
Bunny

Reputation: 342

Found a much simpler solution than the one provided by @eyllanesc here. Turns out there is a function that implements it

Upvotes: 0

eyllanesc
eyllanesc

Reputation: 243887

The problem is that the loop in main is not waiting for the tasks to finish causing the tasks to not finish executing. Use asyncio.gather() to launch and wait for all the coroutines to execute.

import asyncio


async def myfunc(i):
    print("hello", i)
    await asyncio.sleep(i)
    print("world", i)


async def main():
    await asyncio.gather(myfunc(2), myfunc(1))


asyncio.run(main())

The logic that you describe in the comments is more complicated and there is no function that implements it, so you have to design the logic, in this case there are 2 conditions that must happen to finish the application:

  • All tasks must be launched.
  • There should be no active task.

Considering this I use a Future to create a flag that indicates it, also use add_done_callback to notify me that the job has finished.

import asyncio
from collections import deque
import random


async def f(identifier, seconds):
    print(f"Starting task: {identifier}, seconds: {seconds}s")
    await asyncio.sleep(seconds)
    print(f"Finished task: {identifier}")
    return identifier


T = 3


class Manager:
    def __init__(self, q):
        self._q = q
        self._future = asyncio.get_event_loop().create_future()
        self._active_tasks = set()
        self._status = False

    @property
    def q(self):
        return self._q

    def launch_task(self):
        try:
            identifier = self.q.pop()
        except IndexError:
            return False
        else:
            seconds = random.uniform(T - 1, T + 1)
            task = asyncio.create_task(f(identifier, seconds))
            self._active_tasks.add(identifier)
            task.add_done_callback(self._finished_callback)
            return True

    async def work(self):
        self._status = True
        while self.launch_task():
            await asyncio.sleep(T)
        self._status = False
        await self._future

    def _finished_callback(self, c):
        self._active_tasks.remove(c.result())
        if not self._active_tasks and not self._status:
            self._future.set_result(None)


async def main():
    identifiers = deque(range(10))
    manager = Manager(identifiers)
    await manager.work()


if __name__ == "__main__":
    asyncio.run(main())

Upvotes: 1

Related Questions