Discombobulous
Discombobulous

Reputation: 1184

Asyncio.Queue consumer not getting called

I have an asyncio.Queue producer and consumer running as 2 infinite loops. The producer periodically adds jobs to the queue and the consumer waits until a job is available, then processes it, then waits for the next job.

However, for some reason, my consumer is not getting called. I think this is because the producer task never yields to the consumer?

Any ideas on how to fix it so that both workers run in the background as described?

import asyncio
import concurrent.futures
import time

class Consumer:
    def __init__(self, queue: asyncio.Queue):
        self._duration_before_restart_ms = 3000
        self._queue = queue
        self._last_triggered_time_ms = 0

    async def consumer_loop(self):
        while True:
            print("Consumer new iteration.")
            detected_time_ms = await self._queue.get()
            print("Consumer new event: ", detected_time_ms)
            if (
                detected_time_ms - self._duration_before_restart_ms
                < self._last_triggered_time_ms
            ):
                print("Consumer skipping event: ", detected_time_ms)
                # Invalidate all items in queue that happened before
                # _last_triggered_time_ms.
                continue
            print("Consumer processing event: ", detected_time_ms)
            # Simulate authentication (an io bound operation) with sleep.
            time.sleep(5)
            self._last_triggered_time_ms = int(time.time() * 1000)
            print(
                "Consumer processed event: ",
                detected_time_ms,
                " at: ",
                self._last_triggered_time_ms,
            )


class Producer:
    def __init__(self, queue: asyncio.Queue):
        self._detection_time_period_ms = 3000
        self._last_detection_time_ms = 0
        self._queue = queue

    async def producer_loop(self):
        counter = 0
        while True:
            # Iterates at 2fps
            time.sleep(0.5)
            print("Producer counter: ", counter)
            current_time_ms = int(time.time() * 1000)

            if (counter % 10 > 5) and (
                self._last_detection_time_ms + self._detection_time_period_ms
                < current_time_ms
            ):
                print("Producer adding to queue: ", current_time_ms)
                await self._queue.put(current_time_ms)
                print("Producer added to queue: ", current_time_ms)
                self._last_detection_time_ms = current_time_ms
            counter += 1


async def main():
    q = asyncio.Queue()
    producer = Producer(q)
    consumer = Consumer(q)
    producer_task = asyncio.create_task(producer.producer_loop())
    consumer_task = asyncio.create_task(consumer.consumer_loop())


if __name__ == "__main__":
    asyncio.run(main())

Upvotes: 1

Views: 787

Answers (1)

furas
furas

Reputation: 143187

Code works for me if I use await asyncio.sleep() instead of time.sleep()

In async taks don't run at the same time but it should switch task when it see await - and it seems it needs await asyncio.sleep() to have time to switch from producer to customer, and later to switch back from customer to producer.

You have await in put() and get() but I can't explain why it doesn't switch tasks. Maybe it swithc but it switch too fast and it had not enough time to send data in queue.


import asyncio
import concurrent.futures

class Consumer:

    def __init__(self, queue: asyncio.Queue):
        self._duration_before_restart_ms = 3000
        self._queue = queue
        self._last_triggered_time_ms = 0

    async def consumer_loop(self):
        print('start consumer')

        while True:
            print("Consumer new iteration.")
            detected_time_ms = await self._queue.get()
            print("Consumer new event: ", detected_time_ms)
            if (
                detected_time_ms - self._duration_before_restart_ms
                < self._last_triggered_time_ms
            ):
                print("Consumer skipping event: ", detected_time_ms)
                # Invalidate all items in queue that happened before
                # _last_triggered_time_ms.
                continue
            print("Consumer processing event: ", detected_time_ms)
            # Simulate authentication (an io bound operation) with sleep.
            await asyncio.sleep(5)
            self._last_triggered_time_ms = int(time.time() * 1000)
            print(
                "Consumer processed event: ",
                detected_time_ms,
                " at: ",
                self._last_triggered_time_ms,
            )


class Producer:

    def __init__(self, queue: asyncio.Queue):
        self._detection_time_period_ms = 3000
        self._last_detection_time_ms = 0
        self._queue = queue

    async def producer_loop(self):
        print('start producer')

        counter = 0
        while True:
            # Iterates at 2fps
            await asyncio.sleep(0.5)
            print("Producer counter: ", counter)
            current_time_ms = int(time.time() * 1000)

            if (counter % 10 > 5) and (
                self._last_detection_time_ms + self._detection_time_period_ms
                < current_time_ms
            ):
                print("Producer adding to queue: ", current_time_ms)
                await self._queue.put(current_time_ms)
                print("Producer added to queue: ", current_time_ms)
                self._last_detection_time_ms = current_time_ms
            counter += 1


async def main():
    q = asyncio.Queue()
    producer = Producer(q)
    consumer = Consumer(q)
    producer_task = asyncio.create_task(producer.producer_loop())
    consumer_task = asyncio.create_task(consumer.consumer_loop())

    # wait for end of task
    await asyncio.gather(producer_task)
    
if __name__ == "__main__":
    asyncio.run(main())

Upvotes: 1

Related Questions