Nick Martin
Nick Martin

Reputation: 729

Fire coroutine from instide a for loop

I'm trying to fire a coroutine from within a loop. Here's a simple example of what I'm trying to achieve:

import time
import random
import asyncio


def listen():
    while True:
       yield random.random()
       time.sleep(3)


async def dosomething(data: float):
    print("Working on data", data)
    asyncio.sleep(2)
    print("Processed data!")


async def main():
    for pos in listen():
        asyncio.create_task(dosomething(pos))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Unfortunately, this doesn't work and my dosomething coroutine never executes... what am I doing wrong?

Upvotes: 1

Views: 202

Answers (2)

Nick Martin
Nick Martin

Reputation: 729

I wanted to point out that after playing around I ended up using a producer, consumer architecture to achieve what I wanted. I appreciate that I didn't make my exact use case clear in the original question. But here's a simplified snippet of what I ended up implementing:

import asyncio
import random
from datetime import datetime
from pydantic import BaseModel


class Measurement(BaseModel):
    data: float
    time: datetime


async def measure(queue: asyncio.Queue):
    while True:
        # Replicate blocking call to recieve data
        await asyncio.sleep(1)
        print("Measurement complete!")
        for i in range(3):
            data = Measurement(
                data=random.random(),
                time=datetime.utcnow()
            )
            await queue.put(data)

    await queue.put(None)


async def process(queue: asyncio.Queue):
    while True:
        data = await queue.get()
        print(f"Got measurement! {data}")
        # Replicate pause for http request
        await asyncio.sleep(0.3)
        print("Sent data to server")


loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
meansurement = measure(queue)
processor = process(queue)
loop.run_until_complete(asyncio.gather(processor, meansurement))
loop.close()

I should point out here (something I didn't quite understand) that it's imperative that any blocking calls you make are able to be await-ed. Otherwise, you might find that the consumer will never execute.

Upvotes: 0

RomanPerekhrest
RomanPerekhrest

Reputation: 92854

asyncio.create_task function is aimed to schedule Task execution, it should be awaited to wait until it is complete.

Moreover, asyncio.sleep(2) in your code also should awaited, otherwise it'll throw an error/warning.

The right way:

import time
import random
import asyncio


def listen():
    while True:
       yield random.random()
       time.sleep(3)


async def dosomething(data: float):
    print("Working on data", data)
    await asyncio.sleep(2)
    print("Processed data!")


async def main():
    for pos in listen():
        await asyncio.create_task(dosomething(pos))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Sample output:

Working on data 0.9645515392725723
Processed data!
Working on data 0.9249656672476657
Processed data!
Working on data 0.13635467058997397
Processed data!
Working on data 0.03941252405458562
Processed data!
Working on data 0.6299882183389822
Processed data!
Working on data 0.9143748948769984
Processed data!
...

Upvotes: 1

Related Questions