GSazheniuk
GSazheniuk

Reputation: 1384

How to write non-blocking, chunked RequestHandler in Tornado

Here are two simple RequestHandlers:

class AsyncHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        while True:
            future = Future()
            global_futures.add(future)
            s = yield future
            self.write(s)
            self.flush()


class AsyncHandler2(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        for f in global_futures:
            f.set_result(str(dt.now()))
        global_futures.clear()
        self.write("OK")

The first one "subscribes" to the stream, second one delivers message to all subscribers.

The problem is that I cannot have more than a bunch (in my case 5-6) subscribers. As soon as I subscribe more than allowed, the next request to the second method simply hangs.

I assume this is happening due to the first handler not being properly asynchronous. Is that because I am using global object to store list of subscribers?

How can I have more streaming requests open simultaneously, and what is a logical limit?

Upvotes: 0

Views: 139

Answers (1)

Ben Darnell
Ben Darnell

Reputation: 22134

The problem is that global_futures is being modified while you're iterating over it: when AsyncHandler.get wakes up, it runs from one yield to the next, meaning it creates its next Future and adds it to the set before control is returned to AsyncHandler2. This is undefined and the behavior depends on where the iterator is in the set: sometimes the new future is inserted "behind" the iterator and everything is fine, sometimes it's inserted "in front of" the iterator and the same consumer handler will be woken up a second time (and insert a third copy of itself which may be in front or behind...). When you only have a few consumers you'll hit the "behind" case often enough that things will work, but with too many it becomes extremely unlikely to ever finish.

The solution is to copy global_futures before iterating over it instead of clearing it at the end:

@gen.coroutine
def get(self);
    fs = list(global_futures)
    global_futures.clear()
    for f in fs:
        f.set_result(str(dt.now()))
    self.write("OK")

Note that I think this is only a problem in Tornado 4.x and older. In Tornado 5 things were changed so that set_result no longer calls into the waiting handler immediately, so there is no more concurrent modification.

Upvotes: 1

Related Questions