Travis Griggs
Travis Griggs

Reputation: 22252

Any way for tornado handler to detect closure on other end?

I have a tornado coroutine hander that looks in part like:

class QueryHandler(tornado.web.RequestHandler):
    queryQueues = defaultdict(tornado.queues.Queue)

    @tornado.gen.coroutine
    def get(self, network):
        qq = self.queryQueues[network]
        query = yield qq.get()
        # do some work with with the dequeued query
        self.write(response)

On the client side, I use python-requests to long poll it:

fetched = session.get(QueryURL)

I can make a query, the server blocks waiting on the queue until cough up a something to process and finally respond.

This works pretty slick until... the long poll gets shutdown and restarted while the handler is blocking on the queue. When I stop the query on the client side, the handler stays happily blocked. Worse if I restart the query on the client side, I now have a second handler instance blocking on the queue. So when the queue DOES have data show up, the stale handler processes it and replies to the bitbucket, and the restarted query is now blocked indefinitely.

Is there a pattern I can use to avoid this? I had hoped that when the client side closed, the handler would receive some sort of exception indicating that things have gone south. The queue.get() can have a timeout, but what I really want is not a timeout but a sort of "unless I close" exception.

Upvotes: 0

Views: 222

Answers (1)

A. Jesse Jiryu Davis
A. Jesse Jiryu Davis

Reputation: 24009

You want a "queue with guaranteed delivery" which is a hard problem in distributed systems. After all, even if "self.write" succeeds, you can't be certain the other end really received the message.

A basic approach would look like this:

  • each entry in the queue gets an id greater than all previous ids
  • when the client connects it asks to subscribe to the queue
  • when a client is disconnected, it reconnects and asks for all entries with ids greater than the last id it saw
  • when your QueryHandler receives a get with a non-None id, it first serves all entries with ids greater than id, then begins waiting on the queue
  • when your QueryHandler raises an exception from self.write, ignore it: the client is responsible for retrieving the lost entry
  • keep all past entries in a list
  • expire the oldest list entries after some time (hours?)

Upvotes: 1

Related Questions