Stefan B.
Stefan B.

Reputation: 1

Python "proxy" for local socket and websocket hogs cpu

My current project is a Raspberry pi 4 mounted on a din rail with two extra circuit boards. It should collect data from various sources, eg. CAN, RS485, GPIO pins, counting S0 pulses from energy/water/gas meters etc., store the counted values into an eeram and display them in realtime on a webpage.

For every source there is one python script (run as a daemon) which collects the data and stores/transfers it if necessary (on change) to a local socket opened on port 10000.

The main "proxy" daemon listens on port 10000 and forwards the received data to the websocket clients (1 or more) on port 9999. The data is then inserted into the webpage with JavaScript which automatically reconnects if the server is not available.

To interact with the system, there's the possibility to press a button or enter text which is sent as command over the "proxy daemon". The daemon distributes the command immediately to the connected local socket clients.

The daemon has simplified three tasks:

I've read many sources about threading, sockets, asyncio, event loops but my already working script hogs the cpu if a local socket is connected. It's because of the loop in line 67. Is there a better way to achieve the goal, preferably without the thread?

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time, threading, socket, signal, selectors, asyncio, types, urllib
import tornado.httpserver, tornado.websocket, tornado.ioloop, tornado.web

wsclients = []
host = 'localhost'
port = 10000
websocket_port = 9999
commands = []
sel = 0
lsock = 0

class WSHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        wsclients.append(self)
        print("[WS] New websocket connection from", self.request.remote_ip)

    def check_origin(self, origin):  
        parsed_origin = urllib.parse.urlparse(origin)
        print("[WS] origin:", origin)
        return parsed_origin.netloc.endswith("raspi")
        
    def on_message(self, message):
        print("[WS] Received:", message)
        # Forward message to localhost socket clients
        commands.append(message)

    def on_close(self):
        wsclients.remove(self)
        print("[WS] Closed connection from", self.request.remote_ip)


def broadcast(message):
    global wsclients
    for client in wsclients:
        client.write_message(message)
    

def init_socket():
    global sel
    global lsock

    sel = selectors.DefaultSelector()
    lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    lsock.bind((host, port))
    lsock.listen()
    print("[LS] Server listening on %s %d" %(host, port))
    lsock.setblocking(False)
    sel.register(lsock, selectors.EVENT_READ, data=None)
    
    t = threading.Thread(target=socket_loop)
    t.daemon=True
    t.start()

def socket_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.create_task(socket_event())
    loop.run_forever()
    loop.stop()

async def socket_event():
    while True:
        events = sel.select(timeout=None)
        for key, mask in events:
            if key.data is None:
                accept_wrapper(key.fileobj)
            else:
                service_connection(key, mask)

def accept_wrapper(sock):
    conn, addr = sock.accept()
    print("[LS] Connected with " + addr[0] + ":" + str(addr[1]))
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'', port=addr[1])
    sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)

def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)
        if recv_data:
            print("[LS] Received:", recv_data)
            # Forward message to websocket clients
            broadcast(str(recv_data.decode()) + "\n")
        else:
            print('[LS] closing connection to', data.addr)
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if not data.outb and commands:
            data.outb = commands.pop(0).encode()
        if data.outb:
            print('[LS] sending', repr(data.outb), 'to connection port', data.port)
            sent = sock.send(data.outb)
            data.outb = data.outb[sent:]


async def shutdown():
    for client in wsclients:
        client.close()
    http_server.stop()
    lsock.close()
    tornado.ioloop.IOLoop.current().stop()
    print("[*] Exiting..")

def exit_handler(sig, frame):
#    print("[*] Signal:" + str(sig))
    tornado.ioloop.IOLoop.instance().add_callback_from_signal(shutdown)

app = tornado.web.Application([(r'/', WSHandler),])

if __name__ == "__main__":
    signal.signal(signal.SIGTERM, exit_handler)
    signal.signal(signal.SIGINT,  exit_handler)

    init_socket()
    
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(websocket_port)
    print("[WS] Webserver listening on %d" %(websocket_port))

    tornado.ioloop.IOLoop.instance().start()
    #    t.join()

Upvotes: 0

Views: 433

Answers (1)

Ben Darnell
Ben Darnell

Reputation: 22134

The main issue that you're running into is that you only want to register for selectors.EVENT_WRITE when you have data in the output buffer to send. Otherwise the selector will (correctly) tell you that you can write to the socket every time through the loop, but service_connection won't have anything to do. You need to update your registration based on the state of the output buffer.

You're also combining a lot of things together that aren't really necessary. The selectors module is the kind of thing you'd use to implement your own event loop, so it's incorrect to run it from inside another event loop (but since you don't have anything else running on the asyncio loop in your thread it's harmless in this case). Pick just one of these four options to implement your server: Tornado, asyncio, threading with selector, or threading without selector (blocking).

I'd recommend using Tornado since that's what you're using for the websockets. You have a couple of options here depending on how much you like low-level socket programming. You can use IOLoop.add_handler to write some code very much like what you have in service_connection, or you can use IOStream to have a higher-level interface.

Upvotes: 1

Related Questions