user3732793
user3732793

Reputation: 1959

Tornado asynchronous coroutine

Longtime didn't use tornado. I would like to have a websocket which get's updates from a serial device of a host where tornado runs. So I tried multiprocessing with tornado but the process has no access to the tornado websocket. I tried to incorporate it as coroutine but that seems to not not spawn.

class WebApplication(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r'/', IndexPageHandler),
            (r"/config", ConfigHandler),
            (r"/shutdown", ShutdownHandler),
            (r'/websocket', WebSocketHandler),
            (r'/(.*)', tornado.web.StaticFileHandler, {'path': resourcesWeb})
        ]

        settings = {
            'debug': debug,
            'static_path': resourcesWeb,
            'template_path': 'templates'
        }
        tornado.web.Application.__init__(self, handlers, **settings)

    @gen.coroutine
    def serial_reader(self):
        log('serial_reader: start')
        done = False
        while not done:
            sh.read()
            serial_data_from = str(sh.data)
            if len(serial_data_from) > 0:
                if debug:
                    log('serial read:' + serial_data_from)
                    yield [con.write_message(serial_data_from) for con in WebSocketHandler.connections]
            yield gen.sleep(0.3)
        log('serial_reader: exit')

Python 3.8.5, Tornad 6.1

how would I properly and constantly update a websocket with data from outside the the tornado app

Upvotes: 1

Views: 151

Answers (1)

friedcell
friedcell

Reputation: 150

Since sh.read is blocking, you'll need to run it in an executor. To then notify clients in the main thread, you'll need to use IOLoop.add_callback (safe to call from any thread). This also means the reader method becomes a regular sync method.

Example:

from concurrent.futures import ThreadPoolExecutor
import functools

from tornado import web, websocket, ioloop

log = print


class IndexHandler(web.RequestHandler):
    def get(self):
        self.write("""<html>
            <textarea cols="30" rows="10" id="output">%s</textarea><br />
            <a href="/start" target="f" onclick="log(this.innerHTML)">start</a><br />
            <a href="/stop" target="f" onclick="log(this.innerHTML)">stop</a><br />
            <iframe name="f" width="100" height="30"></iframe>
            <script>
                ws = new WebSocket("ws://localhost:8888/stream");
                out_el = document.getElementById("output");
                function log(data) {out_el.value = data + "\\n" + out_el.value;}
                ws.onmessage = function (ev) {log(ev.data);}
            </script>""" % "\n".join(map(str, reversed(self.application.read_data))))


class StartHandler(web.RequestHandler):
    def get(self):
        self.application.start_reader()
        self.write("Started")


class StopHandler(web.RequestHandler):
    def get(self):
        self.application.stop_reader()
        self.write("Stopped")


class WebSocketHandler(websocket.WebSocketHandler):
    connections = set()

    def open(self):
        WebSocketHandler.connections.add(self)

    def on_close(self):
        if self in WebSocketHandler.connections:
            WebSocketHandler.connections.remove(self)


class WebApplication(web.Application):
    def __init__(self, autostart=False):
        handlers = [
            (r"/", IndexHandler),
            (r"/start", StartHandler),
            (r"/stop", StopHandler),
            (r'/stream', WebSocketHandler),
        ]
        web.Application.__init__(self, handlers)
        self._reader_executor = ThreadPoolExecutor(1)
        self._keep_reading = None
        self.read_data = []
        if autostart:
            self.start_reader()
    
    def start_reader(self):
        if not self._keep_reading:
            self._keep_reading = True
            loop = ioloop.IOLoop.current()
            self._reader_future = loop.run_in_executor(self._reader_executor, functools.partial(self.reader, loop))
    
    def stop_reader(self):
        if self._keep_reading:
            self._keep_reading = False
            self._reader_future.cancel()
    
    def notify_clients(self, data=None):
        for con in WebSocketHandler.connections:
            try:
                con.write_message("{}".format(data))
            except Exception as ex:
                log("error sending to {}".format(con))
    
    def reader(self, main_loop):
        import random
        import time
        while self._keep_reading:
            time.sleep(1 + random.random())  # simulate read - block for some time
            data = random.getrandbits(32)
            print("reader: data={}".format(data))
            if data:
                main_loop.add_callback(self.notify_clients, data)
                self.read_data.append(data)
            time.sleep(0.1)


if __name__ == "__main__":
    app = WebApplication(True)
    app.listen(8888)
    loop = ioloop.IOLoop.current()
    try:
        loop.start()
    except KeyboardInterrupt as ex:
        app.stop_reader()
        for con in WebSocketHandler.connections:
            con.close()
        loop.stop()

Upvotes: 1

Related Questions