Amit Gupta
Amit Gupta

Reputation: 303

asyncio.Queue.get() gets stuck on empty queue

In the following code I have a producer running in its own thread and generating Runnable objects and putting them in an asyncio.Queue. The consumer is running as a task in the ayncio event loop. Everything works fine as long as the queue is not empty. Once the await queue.get() encounters an empty queue it never returns even if items are added to the queue. I am running with python version 3.7.6.

import threading
import asyncio
import random
import time
import logging

logger = logging.getLogger()
logger.setLevel(0)
formatter = logging.Formatter('%(asctime)-15s %(levelname)-5s| %(name)s: %(funcName)s: %(message)s')
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
logger.addHandler(consoleHandler)


#=======================================================================
class Runnable(object):
    def __init__(self, id, load):
        self.id = id
        self.load = load
        return

    async def run(self):
        logger.info(f"Running: {self.id}, load: {self.load}")
        await asyncio.sleep(self.load)
        logger.info(f"Done Running: {self.id}, load: {self.load}")
        return

    
#=======================================================================
def producer(queue):
    logger.info("Starting producer")
    eventId = 0
    
    while(True):
        eventId += 1
        load = random.uniform(1, 5)
        r = Runnable(eventId, load)
        queue.put_nowait(r)
        logger.info(f"Added runnable. qsize: {queue.qsize()}")
        time.sleep(3)
    return


async def consumer(queue):
    logger.info("Starting consumer")
    while(True):
        logger.info(f"Waiting for runnable. qsize: {queue.qsize()}")
        r = await queue.get()
        logger.info("Received runnable")
        await r.run()
        logger.info("Done with processing runnable")
    return


def main():
    # create and start a producer
    queue = asyncio.Queue(20)
    prodThd = threading.Thread(target=producer, args=(queue,))
    prodThd.start()

    # start consuming on the async event_loop
    loop = asyncio.get_event_loop()
    task = loop.create_task(consumer(queue))
    loop.run_until_complete(task)
    return

#=======================================================================
if (__name__ == '__main__'):
    main()

Sample output:

2020-06-26 19:52:13,555 DEBUG| asyncio: __init__: Using selector: SelectSelector
2020-06-26 19:52:13,561 INFO | root: producer: Starting producer
2020-06-26 19:52:13,561 INFO | root: producer: Added runnable. qsize: 1
2020-06-26 19:52:13,561 INFO | root: consumer: Starting consumer
2020-06-26 19:52:13,561 INFO | root: consumer: Waiting for runnable. qsize: 1
2020-06-26 19:52:13,561 INFO | root: consumer: Received runnable
2020-06-26 19:52:13,561 INFO | root: run: Running: 1, load: 1.320570165693137
2020-06-26 19:52:14,882 INFO | root: run: Done Running: 1, load: 1.320570165693137
2020-06-26 19:52:14,882 INFO | root: consumer: Done with processing runnable
2020-06-26 19:52:14,882 INFO | root: consumer: Waiting for runnable. qsize: 0
2020-06-26 19:52:16,562 INFO | root: producer: Added runnable. qsize: 1
2020-06-26 19:52:19,562 INFO | root: producer: Added runnable. qsize: 2
2020-06-26 19:52:22,564 INFO | root: producer: Added runnable. qsize: 3
2020-06-26 19:52:25,565 INFO | root: producer: Added runnable. qsize: 4
2020-06-26 19:52:28,566 INFO | root: producer: Added runnable. qsize: 5
2020-06-26 19:52:31,568 INFO | root: producer: Added runnable. qsize: 6
2020-06-26 19:52:34,569 INFO | root: producer: Added runnable. qsize: 7
2020-06-26 19:52:37,569 INFO | root: producer: Added runnable. qsize: 8
2020-06-26 19:52:40,569 INFO | root: producer: Added runnable. qsize: 9
2020-06-26 19:52:43,570 INFO | root: producer: Added runnable. qsize: 10
2020-06-26 19:52:46,571 INFO | root: producer: Added runnable. qsize: 11
2020-06-26 19:52:49,572 INFO | root: producer: Added runnable. qsize: 12
2020-06-26 19:52:52,573 INFO | root: producer: Added runnable. qsize: 13
2020-06-26 19:52:55,574 INFO | root: producer: Added runnable. qsize: 14
2020-06-26 19:52:58,575 INFO | root: producer: Added runnable. qsize: 15
2020-06-26 19:53:01,576 INFO | root: producer: Added runnable. qsize: 16
2020-06-26 19:53:04,576 INFO | root: producer: Added runnable. qsize: 17
2020-06-26 19:53:07,577 INFO | root: producer: Added runnable. qsize: 18
2020-06-26 19:53:10,579 INFO | root: producer: Added runnable. qsize: 19
2020-06-26 19:53:13,579 INFO | root: producer: Added runnable. qsize: 20

Upvotes: 1

Views: 1671

Answers (1)

user2357112
user2357112

Reputation: 282043

You're trying to add items to the queue from a completely different thread, outside the event loop. Asyncio queues can only be safely interacted with from within the event loop. They are explicitly not threadsafe - quoting the docs:

This class is not thread safe.

Upvotes: 2

Related Questions