Gerharddc
Gerharddc

Reputation: 4089

Python Tornado send WebSocket messages from another thread

I want to use WebSockets in Python to keep web clients up to date about data that I am reading from a serial port using PySerial. I am currently using the following code to read the serial data in continuously with a separate thread

def read_from_port():
    while running:
        reading = ser.readline().decode()
        handle_data(reading)

thread = threading.Thread(target=read_from_port)
thread.daemon = True
thread.start()

I am performing some processing on the serial data and then want to broadcast a message to all the connected WebSocket clients if the calculated result differs from its previous value. For this I have set up the following code

clients = []

def Broadcast(message):
    for client in clients:
        client.sendMessage(json.dumps(message).encode('utf8'))
        print("broadcasted")

worker.broadcast = Broadcast

class WSHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print('new connection')
        clients.append(self)
      
    def on_message(self, message):
        print('message received:  %s' % message)
        response = handler.HandleRequest(message, self.write_message)
 
    def on_close(self):
        print('connection closed')
        clients.remove(self)
 
    def check_origin(self, origin):
        return True
 
application = tornado.web.Application([
    (r'/ws', WSHandler),
])
 
if __name__ == "__main__":
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8765)
    myIP = socket.gethostbyname(socket.gethostname())
    print('*** Websocket Server Started at %s***' % myIP)
    tornado.ioloop.IOLoop.instance().start()

I then want to use the "broadcast" method in the worker to broadcast out a result. Using this method from the worker thread produces the following error

File "main.py", line 18, in Broadcast
    client.write_message(message)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 342, in write_message
    return self.ws_connection.write_message(message, binary=binary)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1098, in write_message
    fut = self._write_frame(True, opcode, message, flags=flags)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1075, in _write_frame
    return self.stream.write(frame)
  File "/usr/local/lib/python3.8/site-packages/tornado/iostream.py", line 555, in write
    future = Future()  # type: Future[None]
  File "/usr/local/Cellar/[email protected]/3.8.3_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.

I understand the issue is that the Tornado write_message function is not thread safe and that this error is being produced because I am trying to call the function directly from the worker thread. As far as I can determine, the recommended way to use concurrent code with Tornado is through asyncio, but I think a threading approach might be more appropriate in this situation where I have a loop that essentially runs in parallel constantly.

Unfortunately I know very little about asyncio and how threading is implemented in Python, so I would like to find out what is the simplest way that I can send WebSocket messages from a different thread.

Upvotes: 6

Views: 2961

Answers (2)

Dhiraj Dhule
Dhiraj Dhule

Reputation: 1343

One cleaner option is to use queues such as pyzmq that will help you establish communication from one thread to another.

Looking at your use case, you can use PUB/SUB model. Here is a sample code. Also, you can use 'inproc' instead of 'tcp'. This will reduce the latency since you will be communicating between multiple threads in same process.

Upvotes: 0

Gerharddc
Gerharddc

Reputation: 4089

Reading the official documentation for using asyncio and multithreading together at https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading gave me the necessary clue that one can achieve this quite elegantly using the "call_soon_threadsafe" function. The following code thus seems to do the trick

tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)

clients = []

def bcint(message):
    for client in clients:
        client.write_message(message)
        print("broadcasted")

def Broadcast(message):
    io_loop.asyncio_loop.call_soon_threadsafe(bcint, message)

worker.broadcast = Broadcast

class WSHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print('new connection')
        clients.append(self)
      
    def on_message(self, message):
        print('message received:  %s' % message)
        response = handler.HandleRequest(message, self.write_message)
 
    def on_close(self):
        print('connection closed')
        clients.remove(self)
 
    def check_origin(self, origin):
        return True
 
application = tornado.web.Application([
    (r'/ws', WSHandler),
])
 
if __name__ == "__main__":
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8765)
    myIP = socket.gethostbyname(socket.gethostname())
    print('*** Websocket Server Started at %s***' % myIP)
    tornado.ioloop.IOLoop.current().start()

Upvotes: 3

Related Questions