Reputation: 57188
aiohttp has built-in support for websockets. It's very simple and works well.
A simplified version of the example in the docs is:
async def handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# Async iterate the messages the client sends
async for message in ws:
ws.send_str('You sent: %s' % (message.data,))
print('websocket connection closed')
In the example, ws
is a reference to a websocket connection with a client. I could easily put this references into request.app
, like @Crandel does here (i.e., global state), but not in a production app, because each app server (and even each worker) will have its own app
instance.
Is there an accepted pattern for this? Is there another way?
Note: I'm not referring to sessions. I'm referring to connections. I want to send a message to clients that connected to server A when events occur in application code in server B, etc.
Upvotes: 5
Views: 8525
Reputation: 348
If I'm understanding you correctly, you want to have multiple websocket servers, each with multiple clients connected, but you want to be able to communicate potentially with all of the connected clients.
Here is an example that creates three trivial servers -- a capitalization echo, a random quote, and time of day -- and then sends a broadcast message to all of the connected clients. Maybe this has some useful ideas in it.
Pastebin: https://pastebin.com/xDSACmdV
#!/usr/bin/env python3
"""
Illustrates how to have multiple websocket servers running and send
messages to all their various clients at once.
In response to stackoverflow question:
https://stackoverflow.com/questions/35820782/how-to-manage-websockets-across-multiple-servers-workers
Pastebin: https://pastebin.com/xDSACmdV
"""
import asyncio
import datetime
import random
import time
import webbrowser
import aiohttp
from aiohttp import web
__author__ = "Robert Harder"
__email__ = "[email protected]"
__license__ = "Public Domain"
def main():
# Create servers
cap_srv = CapitalizeEchoServer(port=9990)
rnd_srv = RandomQuoteServer(port=9991)
tim_srv = TimeOfDayServer(port=9992)
# Queue their start operation
loop = asyncio.get_event_loop()
loop.create_task(cap_srv.start())
loop.create_task(rnd_srv.start())
loop.create_task(tim_srv.start())
# Open web pages to test them
webtests = [9990, 9991, 9991, 9992, 9992]
for port in webtests:
url = "http://www.websocket.org/echo.html?location=ws://localhost:{}".format(port)
webbrowser.open(url)
print("Be sure to click 'Connect' on the webpages that just opened.")
# Queue a simulated broadcast-to-all message
def _alert_all(msg):
print("Sending alert:", msg)
msg_dict = {"alert": msg}
cap_srv.broadcast_message(msg_dict)
rnd_srv.broadcast_message(msg_dict)
tim_srv.broadcast_message(msg_dict)
loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")
# Run event loop
loop.run_forever()
class MyServer:
def __init__(self, port):
self.port = port # type: int
self.loop = None # type: asyncio.AbstractEventLoop
self.app = None # type: web.Application
self.srv = None # type: asyncio.base_events.Server
async def start(self):
self.loop = asyncio.get_event_loop()
self.app = web.Application()
self.app["websockets"] = [] # type: [web.WebSocketResponse]
self.app.router.add_get("/", self._websocket_handler)
await self.app.startup()
handler = self.app.make_handler()
self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
print("{} listening on port {}".format(self.__class__.__name__, self.port))
async def close(self):
assert self.loop is asyncio.get_event_loop()
self.srv.close()
await self.srv.wait_closed()
for ws in self.app["websockets"]: # type: web.WebSocketResponse
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')
await self.app.shutdown()
await self.app.cleanup()
async def _websocket_handler(self, request):
assert self.loop is asyncio.get_event_loop()
ws = web.WebSocketResponse()
await ws.prepare(request)
self.app["websockets"].append(ws)
await self.do_websocket(ws)
self.app["websockets"].remove(ws)
return ws
async def do_websocket(self, ws: web.WebSocketResponse):
async for ws_msg in ws: # type: aiohttp.WSMessage
pass
def broadcast_message(self, msg: dict):
for ws in self.app["websockets"]: # type: web.WebSocketResponse
ws.send_json(msg)
class CapitalizeEchoServer(MyServer):
""" Echoes back to client whatever they sent, but capitalized. """
async def do_websocket(self, ws: web.WebSocketResponse):
async for ws_msg in ws: # type: aiohttp.WSMessage
cap = ws_msg.data.upper()
ws.send_str(cap)
class RandomQuoteServer(MyServer):
""" Sends a random quote to the client every so many seconds. """
QUOTES = ["Wherever you go, there you are.",
"80% of all statistics are made up.",
"If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]
def __init__(self, interval: float = 10, *kargs, **kwargs):
super().__init__(*kargs, **kwargs)
self.interval = interval
async def do_websocket(self, ws: web.WebSocketResponse):
async def _regular_interval():
while self.srv.sockets is not None:
quote = random.choice(RandomQuoteServer.QUOTES)
ws.send_json({"quote": quote})
await asyncio.sleep(self.interval)
self.loop.create_task(_regular_interval())
await super().do_websocket(ws) # leave client connected here indefinitely
class TimeOfDayServer(MyServer):
""" Sends a message to all clients simultaneously about time of day. """
async def start(self):
await super().start()
async def _regular_interval():
while self.srv.sockets is not None:
if int(time.time()) % 10 == 0: # Only on the 10 second mark
timestamp = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
self.broadcast_message({"timestamp": timestamp})
await asyncio.sleep(1)
self.loop.create_task(_regular_interval())
if __name__ == "__main__":
main()
Upvotes: 4
Reputation: 57188
Channels was (fortunately) not merged into Django. It will probably remain a great project, but it didn't really belong in Django proper.
Also, I would highly recommend taking a look at Postgres's relatively new, built-in support for pub/sub. It will probably outperform anything else, and building a custom solution atop aiohttp, using Postgres as the backing service, might be your best bet.
Though not aiohttp, Django Channels, which is likely to be merged into Django 1.10, solves this problem in a very intuitive way, and it's written by Andrew Godwin, the author of Django migrations.
Django Channels abstracts the notion of "many processes on many servers" by creating a routing layer in front of a Django app. This routing layer speaks with a backend (e.g., Redis) to maintain a sharable state among processes, and uses a new ASGI protocol to facilitate handling both HTTP requests and WebSockets, while delegating each to their respective "consumers" (e.g., ships with a built-in handler for HTTP requests, and you can write your own for WebSockets).
Django Channels has a concept called Groups, which handles the "broadcast" nature of the problem; that is to say, it allows an event which occurs on a server to trigger messages to clients that are in that Group, regardless of whether they're connected to the same or different process or server.
IMHO, Django Channels is very likely to be abstracted into a more general Python library. There are a couple other Python libraries that achieve Go-like Channels but, as of this writing, there is nothing noteworthy that offers network transparency; the ability for Channels to communicate between processes and servers.
Upvotes: 1
Reputation: 3109
So I am only familiar with Socket.IO in Node but it's fairly easy to scale websockets horizontally with Socket.IO.
Sockets can come with Sessions, so each session is managed by a specific server. This makes it easy to save state for each socket that is open, and load balance across all of your servers.
Here is SocketIO for Python:
https://pypi.python.org/pypi/socketIO-client
Here is a really good read on how to attach sessions to a redis-store to make it even faster and load balancing across servers more manageable.
How to share sessions with Socket.IO 1.x and Express 4.x?
I know this doesn't answer your question about aiohttp, but hopefully this will give you a better idea about how sockets can work.
Edit: Written in Node-
In Socket.IO this is really easy, it has a ton of functions to broadcast messages in a variety of different ways.
For your example if you would like to emit a message across to everyone in each chat-room. Example everyone that has a socket open you can easily just write.
socket.broadcast.emit('WARNING', "this is a test");
Let's say you have rooms open you can broadcast messages only to people in that room with a simple function called .to()
. Example I have a room named 'BBQ':
socket.broadcast.to('BBQ').emit('invitation', 'Come get some food!');
This will message everyone in channel BBQ - Come get some food!
Edit: Edit:
This is a fantastic write for how Socket.IO works, make sure you read the second answer for the updated versions of the functions. It is much easier to read than their documentation.
Send response to all clients except sender (Socket.io)
As far as I know this is how it all works in the python implementation as well. For ease of use I would certainly use it for websockets. The aiohttp seems really powerful but either doesn't have this functionality, buried in the documentation, or written only in the code without any documentation yet.
Upvotes: 3