Reputation: 33
I was trying to run the following code. The code is an example for adding item to queue from another thread, and having an exception when timeout happens if the queue is not supplied with new items.
import asyncio
import threading
import time
async def responder(queue, value):
value["value"] = await queue.get()
async def worker(queue):
value = dict()
while True:
print("waiting")
# new_msg = await queue.get()
asyncio.wait_for(responder(queue, value), timeout=2)
print(value)
print("worked")
class wManager():
def __init__(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.queue = asyncio.Queue()
self.loop.create_task(worker(self.queue))
self.msg = 0
def add(self):
print("queue Size Before"+str(self.queue.qsize()))
asyncio.run_coroutine_threadsafe(self.queue.put(self.msg), self.loop)
self.msg += 1
print("queue Size After"+str(self.queue.qsize()))
def start(self):
self.loop.run_forever()
def print_after_sleep(manager):
while True:
print("hi!")
time.sleep(3)
manager.add()
def manager_runer(manager):
manager.start()
if __name__ == "__main__":
manager = wManager()
manager_thread = threading.Thread(target=manager_runer, args=(manager,), daemon=True)
sleepy_thread = threading.Thread(
target=print_after_sleep, args=(manager,), daemon=True)
sleepy_thread.start()
manager_thread.start()
sleepy_thread.join()
I want to be able to have a timeout if the queue is not getting new items after a period of predetemined time.
The problem is that responder()
is not awaited.
Thanks
Upvotes: 1
Views: 4433
Reputation: 154836
You're missing an await
in front of asyncio.wait_for()
. You also probably want to add a try/catch to handle the TimeoutError
raised by the call in case of timeout.
Upvotes: 2