Green 绿色
Green 绿色

Reputation: 2886

Call async functions in blocking context in Tornado

I want to implement a service based on web sockets in the Tornado framework. When a user closes a web socket, I want to notify the other users about this. However, on_close is apparently a blocking function and my _broadcast(str) -> None function is async.

How can I call this function anyway?

from tornado import websocket

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class SocketHandler(websocket.WebSocketHandler):
    async def open(self, *args, conns, **kwargs):
        logger.info(f"Opened a new connection to client {id(self)}")
        self._conns = conns

    async def on_message(self, message):
        logger.info(f"Client {id(self)} sent message: {message}")
        await self._broadcast(message)

    def on_close(self):
        logger.info(f"Client {id(self)} has left the scene")
        self._conns.remove(self)
        self._broadcast("something")  # TODO

    async def _broadcast(self, msg): 
        for conn in self._conns: 
            try:
                await conn.write_message(msg)
            except websocket.WebSocketClosedError:
                pass


app = web.Application([
    (r'/ws', SocketHandler)
])

if __name__ == '__main__':
    app.listen(9000)
    ioloop.IOLoop.instance().start()

Upvotes: 2

Views: 585

Answers (2)

Ben Darnell
Ben Darnell

Reputation: 22134

The simple solution you're looking for is to use asyncio.create_task when calling the coroutine:

def on_close(self):
    logger.info(f"Client {id(self)} has left the scene")
    self._conns.remove(self)
    asyncio.create_task(self._broadcast("something"))

(the legacy Tornado version of this function is tornado.gen.convert_yielded, but now that Tornado and asyncio are integrated there's no reason not to use the asyncio version for native coroutines)

But for this particular problem, the use of await in your _broadcast function is not ideal. Awaiting a write_message is used to provide flow control, but create_task doesn't do anything useful with the backpressure provided by await. (write_message is fairly unusual in that it is fully supported to call it both with and without await). In fact, it applies backpressure to the wrong things - one slow connection will slow notifications to all the others that come after it.

So in this case I'd advise making _broadcast a regular synchronous function:

def _broadcast(self, msg): 
    for conn in self._conns: 
        try:
            conn.write_message(msg)
        except websocket.WebSocketClosedError:
            pass

If you want to be better able to control memory usage (via the flow control provided by await write_message), you'll need a more complicated solution, probably involving a bounded queue for each connection (in on_close, use put_nowait to add the message to every connection's queue, then have a task that reads from the queue and writes the message with await write_message)

Upvotes: 2

acushner
acushner

Reputation: 9946

i think a solution that involves using an asyncio.Queue should work for you.

i made a small class as a mock-up to test this out:

import asyncio
import time


class Thing:
    on_close_q = asyncio.Queue()

    def __init__(self):
        self.conns = range(3)

    def on_close(self, id):
        time.sleep(id)
        print(f'closing {id}')
        self.on_close_q.put_nowait((self, id))

    async def notify(self, msg):
        print('in notify')
        for conn in range(3):
            print(f'notifying {conn} {msg}')


async def monitor_on_close():
    print('monitoring')
    while True:
        instance, id = await Thing.on_close_q.get()
        await instance.notify(f'{id} is closed')

from there, you'll need to run monitor_on_close in the ioloop you get from tornado. i've never used tornado, but i think adding something like this to your __main__ block should work:

    ioloop.IOLoop.current().add_callback(monitor_on_close) 

Upvotes: 1

Related Questions