JayK23
JayK23

Reputation: 263

Python - how to receive data and serve it as a websocket server?

I'm facing a problem with a python application i'm trying to build: basically i have a script that receives simple json data from a Redis PUBSUB connection, and i would like to serve this data to client as a websocket server. So basically, every time i receive a message from the redis connection, that message must be sent to clients using websockets.

Here is my basic code:

The part where i receive data from the redis pubsub connection:

import json
import redis

redis_url = 'MY-URL'
channel = 'test'

connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)

pubsub = connection.pubsub(ignore_subscribe_messages=False)
pubsub.subscribe(channel)


for item in pubsub.listen():
    message = item['data']

    if type(message) != int:
        message = json.loads(message)
        print(message)

And here is a simple websocket server, i'm using websockets:

import asyncio
import websockets

async def main(websocket, path):
    while True:
        await websockets.send('Some data')

start_server = websockets.serve(main, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

I'm missing the way to merge this two parts of code. Is there any way to accomplish this?

Upvotes: 0

Views: 3606

Answers (1)

Chandan
Chandan

Reputation: 11807

Solution 1

In this example for each new websocket connection request we are registering new listener

import json
import redis
import asyncio
import websockets

redis_url = 'redis://localhost:6379/0'
channel = 'test'

connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)

pubsub = connection.pubsub(ignore_subscribe_messages=False)
pubsub.subscribe(channel)

async def main(websocket, path):
    for item in pubsub.listen():
        message = item['data']

        if type(message) != int:
            message = json.loads(message)
            print(message)
            await websocket.send(message)

start_server = websockets.serve(main, "localhost", 8765)

print("Started")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Solution 2

Here we have used single listerer to send events to multiple clients which register through websocket and send messages to opened websocket connection

import json
import redis
import gevent
from flask import Flask
from flask_sockets import Sockets

redis_url = 'redis://localhost:6379/0'
channel = 'test'

connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)

class PubSubListener(object):
    def __init__(self):
        self.clients = []
        self.pubsub = connection.pubsub(ignore_subscribe_messages=False)
        self.pubsub.subscribe(**{channel: self.handler})
        self.thread = self.pubsub.run_in_thread(sleep_time=0.001)

    def register(self, client):
        self.clients.append(client)

    def handler(self, message):
        _message = message['data']

        if type(_message) != int:
            self.send(_message)

    def send(self, data):
        for client in self.clients:
            try:
                client.send(data)
            except Exception:
                self.clients.remove(client)

pslistener = PubSubListener()

app = Flask(__name__)
sockets = Sockets(app)

@sockets.route('/echo')
def echo_socket(ws):
    pslistener.register(ws)

    while not ws.closed:
        gevent.sleep(0.1)

@app.route('/')
def hello():
    return 'Hello World!'


if __name__ == "__main__":
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler
    print("Started")
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()

Upvotes: 2

Related Questions