Reputation: 4089
I want to use WebSockets in Python to keep web clients up to date about data that I am reading from a serial port using PySerial. I am currently using the following code to read the serial data in continuously with a separate thread
def read_from_port():
while running:
reading = ser.readline().decode()
handle_data(reading)
thread = threading.Thread(target=read_from_port)
thread.daemon = True
thread.start()
I am performing some processing on the serial data and then want to broadcast a message to all the connected WebSocket clients if the calculated result differs from its previous value. For this I have set up the following code
clients = []
def Broadcast(message):
for client in clients:
client.sendMessage(json.dumps(message).encode('utf8'))
print("broadcasted")
worker.broadcast = Broadcast
class WSHandler(tornado.websocket.WebSocketHandler):
def open(self):
print('new connection')
clients.append(self)
def on_message(self, message):
print('message received: %s' % message)
response = handler.HandleRequest(message, self.write_message)
def on_close(self):
print('connection closed')
clients.remove(self)
def check_origin(self, origin):
return True
application = tornado.web.Application([
(r'/ws', WSHandler),
])
if __name__ == "__main__":
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8765)
myIP = socket.gethostbyname(socket.gethostname())
print('*** Websocket Server Started at %s***' % myIP)
tornado.ioloop.IOLoop.instance().start()
I then want to use the "broadcast" method in the worker to broadcast out a result. Using this method from the worker thread produces the following error
File "main.py", line 18, in Broadcast
client.write_message(message)
File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 342, in write_message
return self.ws_connection.write_message(message, binary=binary)
File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1098, in write_message
fut = self._write_frame(True, opcode, message, flags=flags)
File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1075, in _write_frame
return self.stream.write(frame)
File "/usr/local/lib/python3.8/site-packages/tornado/iostream.py", line 555, in write
future = Future() # type: Future[None]
File "/usr/local/Cellar/[email protected]/3.8.3_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.
I understand the issue is that the Tornado write_message function is not thread safe and that this error is being produced because I am trying to call the function directly from the worker thread. As far as I can determine, the recommended way to use concurrent code with Tornado is through asyncio, but I think a threading approach might be more appropriate in this situation where I have a loop that essentially runs in parallel constantly.
Unfortunately I know very little about asyncio and how threading is implemented in Python, so I would like to find out what is the simplest way that I can send WebSocket messages from a different thread.
Upvotes: 6
Views: 2961
Reputation: 1343
One cleaner option is to use queues such as pyzmq that will help you establish communication from one thread to another.
Looking at your use case, you can use PUB/SUB model. Here is a sample code. Also, you can use 'inproc' instead of 'tcp'. This will reduce the latency since you will be communicating between multiple threads in same process.
Upvotes: 0
Reputation: 4089
Reading the official documentation for using asyncio and multithreading together at https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading gave me the necessary clue that one can achieve this quite elegantly using the "call_soon_threadsafe" function. The following code thus seems to do the trick
tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)
clients = []
def bcint(message):
for client in clients:
client.write_message(message)
print("broadcasted")
def Broadcast(message):
io_loop.asyncio_loop.call_soon_threadsafe(bcint, message)
worker.broadcast = Broadcast
class WSHandler(tornado.websocket.WebSocketHandler):
def open(self):
print('new connection')
clients.append(self)
def on_message(self, message):
print('message received: %s' % message)
response = handler.HandleRequest(message, self.write_message)
def on_close(self):
print('connection closed')
clients.remove(self)
def check_origin(self, origin):
return True
application = tornado.web.Application([
(r'/ws', WSHandler),
])
if __name__ == "__main__":
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8765)
myIP = socket.gethostbyname(socket.gethostname())
print('*** Websocket Server Started at %s***' % myIP)
tornado.ioloop.IOLoop.current().start()
Upvotes: 3