Cookie
Cookie

Reputation: 25

Asyncio and infinite loop

@asyncio.coroutine
    def listener():
        while True:
            message = yield from websocket.recieve_message()
            if message:
                yield from handle(message)

loop = asyncio.get_event_loop()
loop.run_until_complete(listener())

Let's say i'm using websockets with asyncio. That means I recieve messages from websockets. And when I recieve a message, I want to handle the message but I'm loosing all the async thing with my code. Because the yield from handle(message) is definetly blocking... How could I find a way to make it non-blocking ? Like, handle multiple messages in the same time. Not having to wait the message to be handled before I can handle another message.

Thanks.

Upvotes: 1

Views: 7275

Answers (1)

Ethan Frey
Ethan Frey

Reputation: 198

If you don't care about the return value from handle message, you can simply create a new Task for it, which will run in the event loop alongside your websocket reader. Here is a simple example:

@asyncio.coroutine
def listener():
    while True:
        message = yield from websocket.recieve_message()
        if message:
            asyncio.ensure_future(handle(message))

ensure_future will create a task and attach it to the default event loop. Since the loop is already running, it will get processed alongside your websocket reader in parallel. In fact, if it is a slow-running I/O blocked task (like sending an email), you could easily have a few dozen handle(message) tasks running at once. They are created dynamically when needed, and destroyed when finished (with much lower overhead than spawning threads).

If you want a bit more control, you could simply write to an asyncio.Queue in the reader and have a task pool of a fixed size that can consume the queue, a typical pattern in multi-threaded or multi-process programming.

@asyncio.coroutine
def consumer(queue):
    while True:
        message = yield from queue.get()
        yield from handle(message)

@asyncio.coroutine
def listener(queue):
    for i in range(5):
         asyncio.ensure_future(consumer(queue))
    while True:
        message = yield from websocket.recieve_message()
        if message:
            yield from q.put(message)

q = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(listener(q))

Upvotes: 3

Related Questions