Reputation: 2886
I want to implement a service based on web sockets in the Tornado framework. When a user closes a web socket, I want to notify the other users about this. However, on_close
is apparently a blocking function and my _broadcast(str) -> None
function is async.
How can I call this function anyway?
from tornado import websocket
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SocketHandler(websocket.WebSocketHandler):
async def open(self, *args, conns, **kwargs):
logger.info(f"Opened a new connection to client {id(self)}")
self._conns = conns
async def on_message(self, message):
logger.info(f"Client {id(self)} sent message: {message}")
await self._broadcast(message)
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
self._broadcast("something") # TODO
async def _broadcast(self, msg):
for conn in self._conns:
try:
await conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
app = web.Application([
(r'/ws', SocketHandler)
])
if __name__ == '__main__':
app.listen(9000)
ioloop.IOLoop.instance().start()
Upvotes: 2
Views: 585
Reputation: 22134
The simple solution you're looking for is to use asyncio.create_task
when calling the coroutine:
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
asyncio.create_task(self._broadcast("something"))
(the legacy Tornado version of this function is tornado.gen.convert_yielded
, but now that Tornado and asyncio are integrated there's no reason not to use the asyncio version for native coroutines)
But for this particular problem, the use of await
in your _broadcast
function is not ideal. Awaiting a write_message
is used to provide flow control, but create_task
doesn't do anything useful with the backpressure provided by await
. (write_message
is fairly unusual in that it is fully supported to call it both with and without await
). In fact, it applies backpressure to the wrong things - one slow connection will slow notifications to all the others that come after it.
So in this case I'd advise making _broadcast
a regular synchronous function:
def _broadcast(self, msg):
for conn in self._conns:
try:
conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
If you want to be better able to control memory usage (via the flow control provided by await write_message
), you'll need a more complicated solution, probably involving a bounded queue for each connection (in on_close
, use put_nowait
to add the message to every connection's queue, then have a task that reads from the queue and writes the message with await write_message
)
Upvotes: 2
Reputation: 9946
i think a solution that involves using an asyncio.Queue
should work for you.
i made a small class as a mock-up to test this out:
import asyncio
import time
class Thing:
on_close_q = asyncio.Queue()
def __init__(self):
self.conns = range(3)
def on_close(self, id):
time.sleep(id)
print(f'closing {id}')
self.on_close_q.put_nowait((self, id))
async def notify(self, msg):
print('in notify')
for conn in range(3):
print(f'notifying {conn} {msg}')
async def monitor_on_close():
print('monitoring')
while True:
instance, id = await Thing.on_close_q.get()
await instance.notify(f'{id} is closed')
from there, you'll need to run monitor_on_close
in the ioloop
you get from tornado. i've never used tornado, but i think adding something like this to your __main__
block should work:
ioloop.IOLoop.current().add_callback(monitor_on_close)
Upvotes: 1