Reputation: 422
I'm trying to build a web server to collect "commands" via AJAX and then distribute the commands to clients via long-polling.
The goal is that someone POSTs some data to /add-command.
Another client implements a long-polling client hitting /poll waiting for a command to execute.
I think a queue is the right data structure to use to hold commands waiting for attention. I'd like the commands to essentially be distributed immediately to any long-polling client but held if no client is currently polling.
Here's my python script.
import os
import time
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import Queue
import multiprocessing.pool
import mysql.connector
import urlparse
import uuid
import json
_commandQueue = Queue.Queue()
_commandPollInterval = 0.2
_commandPollTimeout = 10
class HomeHandler(tornado.web.RequestHandler):
def get(self):
self.render("home.htm")
class AddCommandHandler(tornado.web.RequestHandler):
def post(self):
d = urlparse.parse_qs(self.request.body)
_commandQueue.put(d)
self.write(str(True))
class PollHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
self.write("start")
d = 1
d = yield self.getCommand()
self.write(str(d))
self.write("end")
self.finish()
@tornado.gen.coroutine
def getCommand(self):
start = time.time()
while (time.time() - start) < _commandPollTimeout * 1000:
if not _commandQueue.empty:
return _commandQueue.get()
else:
time.sleep(_commandPollInterval)
return None
def main():
application = tornado.web.Application(
[
(r"/", HomeHandler),
(r"/add-command", AddCommandHandler),
(r"/poll", PollHandler),
],
debug=True,
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
)
tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000)))
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
The AddCommandHandler
works fine to put items in the _commandQueue
.
The PollHandler
request just times out. If I call the PollHandler
, it seems to lock the _commandQueue
and I can't put or get from it.
I suspect I need to join the queue, but I can't seem to find the right time to do that in the code.
UPDATE -- Here's my final code thanks to the answers
import os
import time
import datetime
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import tornado.queues
import urlparse
import json
_commandQueue = tornado.queues.Queue()
_commandPollInterval = 0.2
_commandPollTimeout = 10
class HomeHandler(tornado.web.RequestHandler):
def get(self):
self.render("home.htm")
class AddCommandHandler(tornado.web.RequestHandler):
def get(self):
cmd = urlparse.parse_qs(self.request.body)
_commandQueue.put(cmd)
self.write(str(cmd))
def post(self):
cmd = urlparse.parse_qs(self.request.body)
_commandQueue.put(cmd)
self.write(str(cmd))
class PollHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
cmd = yield self.getCommand()
self.write(str(cmd))
@tornado.gen.coroutine
def getCommand(self):
try:
cmd = yield _commandQueue.get(
timeout=datetime.timedelta(seconds=_commandPollTimeout)
)
raise tornado.gen.Return(cmd)
except tornado.gen.TimeoutError:
raise tornado.gen.Return()
def main():
application = tornado.web.Application(
[
(r"/", HomeHandler),
(r"/add-command", AddCommandHandler),
(r"/poll", PollHandler),
],
debug=True,
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
)
tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000)))
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
Upvotes: 1
Views: 1263
Reputation: 12587
In async model you should omit blocking operation, time.sleep
is evil in your code. Moreover, I think that the best way is to use tornado's (in async interface) queue - tornado.queue.Queue
and use async get:
import datetime
import tornado.gen
import tornado.queues
_commandQueue = tornado.queues.Queue()
# ...rest of the code ...
@tornado.gen.coroutine
def getCommand(self):
try:
# wait for queue item if cannot obtain in timeout raise exception
cmd = yield _commandQueue.get(
timeout=datetime.timedelta(seconds=_commandPollTimeout)
)
return cmd
except tornado.gen.Timeout:
return None
Note: Module tornado.queues
si available since Tornado 4.x, if you use older one, Toro will help.
Upvotes: 1
Reputation: 9689
You can NOT use sleep in listener, since it blocks reading from input stream. time.sleep(_commandPollInterval)
. What you should use is yield gen.sleep(_commandPollInterval)
Upvotes: 1