Reputation: 263
I'm facing a problem with a python application i'm trying to build: basically i have a script that receives simple json data from a Redis PUBSUB connection, and i would like to serve this data to client as a websocket server. So basically, every time i receive a message from the redis connection, that message must be sent to clients using websockets.
Here is my basic code:
The part where i receive data from the redis pubsub connection:
import json
import redis
redis_url = 'MY-URL'
channel = 'test'
connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)
pubsub = connection.pubsub(ignore_subscribe_messages=False)
pubsub.subscribe(channel)
for item in pubsub.listen():
message = item['data']
if type(message) != int:
message = json.loads(message)
print(message)
And here is a simple websocket server, i'm using websockets
:
import asyncio
import websockets
async def main(websocket, path):
while True:
await websockets.send('Some data')
start_server = websockets.serve(main, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
I'm missing the way to merge this two parts of code. Is there any way to accomplish this?
Upvotes: 0
Views: 3606
Reputation: 11807
In this example for each new websocket
connection request we are registering new listener
import json
import redis
import asyncio
import websockets
redis_url = 'redis://localhost:6379/0'
channel = 'test'
connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)
pubsub = connection.pubsub(ignore_subscribe_messages=False)
pubsub.subscribe(channel)
async def main(websocket, path):
for item in pubsub.listen():
message = item['data']
if type(message) != int:
message = json.loads(message)
print(message)
await websocket.send(message)
start_server = websockets.serve(main, "localhost", 8765)
print("Started")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Here we have used single listerer to send events to multiple clients which register through websocket and send messages to opened websocket connection
import json
import redis
import gevent
from flask import Flask
from flask_sockets import Sockets
redis_url = 'redis://localhost:6379/0'
channel = 'test'
connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)
class PubSubListener(object):
def __init__(self):
self.clients = []
self.pubsub = connection.pubsub(ignore_subscribe_messages=False)
self.pubsub.subscribe(**{channel: self.handler})
self.thread = self.pubsub.run_in_thread(sleep_time=0.001)
def register(self, client):
self.clients.append(client)
def handler(self, message):
_message = message['data']
if type(_message) != int:
self.send(_message)
def send(self, data):
for client in self.clients:
try:
client.send(data)
except Exception:
self.clients.remove(client)
pslistener = PubSubListener()
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/echo')
def echo_socket(ws):
pslistener.register(ws)
while not ws.closed:
gevent.sleep(0.1)
@app.route('/')
def hello():
return 'Hello World!'
if __name__ == "__main__":
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
print("Started")
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()
Upvotes: 2