Reputation: 25
@asyncio.coroutine
def listener():
while True:
message = yield from websocket.recieve_message()
if message:
yield from handle(message)
loop = asyncio.get_event_loop()
loop.run_until_complete(listener())
Let's say i'm using websockets with asyncio. That means I recieve messages from websockets
. And when I recieve a message, I want to handle the message but I'm loosing all the async thing with my code. Because the yield from handle(message)
is definetly blocking... How could I find a way to make it non-blocking ? Like, handle multiple messages in the same time. Not having to wait the message to be handled before I can handle another message.
Thanks.
Upvotes: 1
Views: 7275
Reputation: 198
If you don't care about the return value from handle message, you can simply create a new Task for it, which will run in the event loop alongside your websocket reader. Here is a simple example:
@asyncio.coroutine
def listener():
while True:
message = yield from websocket.recieve_message()
if message:
asyncio.ensure_future(handle(message))
ensure_future
will create a task and attach it to the default event loop. Since the loop is already running, it will get processed alongside your websocket reader in parallel. In fact, if it is a slow-running I/O blocked task (like sending an email), you could easily have a few dozen handle(message) tasks running at once. They are created dynamically when needed, and destroyed when finished (with much lower overhead than spawning threads).
If you want a bit more control, you could simply write to an asyncio.Queue in the reader and have a task pool of a fixed size that can consume the queue, a typical pattern in multi-threaded or multi-process programming.
@asyncio.coroutine
def consumer(queue):
while True:
message = yield from queue.get()
yield from handle(message)
@asyncio.coroutine
def listener(queue):
for i in range(5):
asyncio.ensure_future(consumer(queue))
while True:
message = yield from websocket.recieve_message()
if message:
yield from q.put(message)
q = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(listener(q))
Upvotes: 3