BarakB
BarakB

Reputation: 33

wait_for timeout to over come asyncio.queue not having a timeout not working

I was trying to run the following code. The code is an example for adding item to queue from another thread, and having an exception when timeout happens if the queue is not supplied with new items.

import asyncio
import threading
import time


async def responder(queue, value):
    value["value"] = await queue.get()


async def worker(queue):
    value = dict()
    while True:
        print("waiting")
        # new_msg = await queue.get()
        asyncio.wait_for(responder(queue, value), timeout=2)
        print(value)
        print("worked")


class wManager():
    def __init__(self):

        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        self.queue = asyncio.Queue()

        self.loop.create_task(worker(self.queue))

        self.msg = 0

    def add(self):
        print("queue Size Before"+str(self.queue.qsize()))
        asyncio.run_coroutine_threadsafe(self.queue.put(self.msg), self.loop)

        self.msg += 1
        print("queue Size After"+str(self.queue.qsize()))

    def start(self):
        self.loop.run_forever()


def print_after_sleep(manager):

    while True:
        print("hi!")
        time.sleep(3)
        manager.add()


def manager_runer(manager):
    manager.start()


if __name__ == "__main__":
    manager = wManager()
    manager_thread = threading.Thread(target=manager_runer, args=(manager,), daemon=True)

    sleepy_thread = threading.Thread(
        target=print_after_sleep, args=(manager,), daemon=True)

    sleepy_thread.start()
    manager_thread.start()
    sleepy_thread.join()

I want to be able to have a timeout if the queue is not getting new items after a period of predetemined time. The problem is that responder() is not awaited.

Thanks

Upvotes: 1

Views: 4433

Answers (1)

user4815162342
user4815162342

Reputation: 154836

You're missing an await in front of asyncio.wait_for(). You also probably want to add a try/catch to handle the TimeoutError raised by the call in case of timeout.

Upvotes: 2

Related Questions