OneCleverMonkey
OneCleverMonkey

Reputation: 422

Python Tornado - How to Implement Long-Polling Server to Read from a Queue

I'm trying to build a web server to collect "commands" via AJAX and then distribute the commands to clients via long-polling.

The goal is that someone POSTs some data to /add-command.

Another client implements a long-polling client hitting /poll waiting for a command to execute.

I think a queue is the right data structure to use to hold commands waiting for attention. I'd like the commands to essentially be distributed immediately to any long-polling client but held if no client is currently polling.

Here's my python script.

import os
import time
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import Queue
import multiprocessing.pool
import mysql.connector
import urlparse
import uuid
import json

_commandQueue = Queue.Queue()
_commandPollInterval = 0.2
_commandPollTimeout = 10

class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("home.htm")

class AddCommandHandler(tornado.web.RequestHandler):
    def post(self):
        d = urlparse.parse_qs(self.request.body)
        _commandQueue.put(d)
        self.write(str(True))

class PollHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        self.write("start")
        d = 1
        d = yield self.getCommand()
        self.write(str(d))
        self.write("end")
        self.finish()
    @tornado.gen.coroutine
    def getCommand(self):
        start = time.time()
        while (time.time() - start) < _commandPollTimeout * 1000:
            if not _commandQueue.empty:
                return _commandQueue.get()
            else:
                time.sleep(_commandPollInterval)
        return None 

def main():
    application = tornado.web.Application(
        [
            (r"/", HomeHandler),
            (r"/add-command", AddCommandHandler),
            (r"/poll", PollHandler),
        ], 
        debug=True, 
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
    )
    tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000)))
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()

The AddCommandHandler works fine to put items in the _commandQueue.

The PollHandler request just times out. If I call the PollHandler, it seems to lock the _commandQueue and I can't put or get from it.

I suspect I need to join the queue, but I can't seem to find the right time to do that in the code.

UPDATE -- Here's my final code thanks to the answers

import os
import time
import datetime
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import tornado.queues
import urlparse
import json

_commandQueue = tornado.queues.Queue()
_commandPollInterval = 0.2
_commandPollTimeout = 10

class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("home.htm")

class AddCommandHandler(tornado.web.RequestHandler):
    def get(self):
        cmd = urlparse.parse_qs(self.request.body)
        _commandQueue.put(cmd)
        self.write(str(cmd))
    def post(self):
        cmd = urlparse.parse_qs(self.request.body)
        _commandQueue.put(cmd)
        self.write(str(cmd))

class PollHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        cmd = yield self.getCommand()
        self.write(str(cmd))
    @tornado.gen.coroutine
    def getCommand(self):
        try:
            cmd = yield _commandQueue.get(
                timeout=datetime.timedelta(seconds=_commandPollTimeout)
            )
            raise tornado.gen.Return(cmd)
        except tornado.gen.TimeoutError:
            raise tornado.gen.Return()

def main():
    application = tornado.web.Application(
        [
            (r"/", HomeHandler),
            (r"/add-command", AddCommandHandler),
            (r"/poll", PollHandler),
        ], 
        debug=True, 
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
    )
    tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000)))
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()

Upvotes: 1

Views: 1263

Answers (2)

kwarunek
kwarunek

Reputation: 12587

In async model you should omit blocking operation, time.sleep is evil in your code. Moreover, I think that the best way is to use tornado's (in async interface) queue - tornado.queue.Queue and use async get:

import datetime
import tornado.gen
import tornado.queues

_commandQueue = tornado.queues.Queue()


    # ...rest of the code ...

    @tornado.gen.coroutine
    def getCommand(self):
        try:
            # wait for queue item if cannot obtain in timeout raise exception
            cmd = yield _commandQueue.get(
                timeout=datetime.timedelta(seconds=_commandPollTimeout)
            )
            return cmd
        except tornado.gen.Timeout:
            return None

Note: Module tornado.queues si available since Tornado 4.x, if you use older one, Toro will help.

Upvotes: 1

deathangel908
deathangel908

Reputation: 9689

You can NOT use sleep in listener, since it blocks reading from input stream. time.sleep(_commandPollInterval). What you should use is yield gen.sleep(_commandPollInterval)

Upvotes: 1

Related Questions