del-boy
del-boy

Reputation: 3654

Tornado with ThreadPoolExecutor

I have setup that uses Tornado as http server and custom made http framework. Idea is to have single tornado handler and every request that arrives should be just submitted to ThreadPoolExecutor and leave Tornado to listen for new requests. Once thread finishes processing request, callback is called that sends response to client in same thread where IO loop is being executes.

Stripped down, code looks something like this. Base http server class:

class HttpServer():
    def __init__(self, router, port, max_workers):
        self.router = router
        self.port = port
        self.max_workers = max_workers

    def run(self):
        raise NotImplementedError()

Tornado backed implementation of HttpServer:

class TornadoServer(HttpServer):
    def run(self):
        executor = futures.ThreadPoolExecutor(max_workers=self.max_workers)

        def submit(callback, **kwargs):
            future = executor.submit(Request(**kwargs))
            future.add_done_callback(callback)
            return future

        application = web.Application([
            (r'(.*)', MainHandler, {
                'submit': submit,
                'router': self.router   
            })
        ])

        application.listen(self.port)

        ioloop.IOLoop.instance().start()

Main handler that handles all tornado requests (implemented only GET, but other would be the same):

class MainHandler():
    def initialize(self, submit, router):
        self.submit = submit
        self.router = router

    def worker(self, request):
        responder, kwargs = self.router.resolve(request)
        response = responder(**kwargs)
        return res

    def on_response(self, response):
        # when this is called response should already have result
        if isinstance(response, Future):
            response = response.result()
        # response is my own class, just write returned content to client
        self.write(response.data)
        self.flush()
        self.finish()

    def _on_response_ready(self, response):
        # schedule response processing in ioloop, to be on ioloop thread
        ioloop.IOLoop.current().add_callback(
            partial(self.on_response, response)
        )

    @web.asynchronous
    def get(self, url):
        self.submit(
            self._on_response_ready, # callback
            url=url, method='post', original_request=self.request
        )

Server is started with something like:

router = Router()
server = TornadoServer(router, 1111, max_workers=50)
server.run()

So, as you can see, main handler just submits every request to thread pool and when processing is done, callback is called (_on_response_ready) which just schedules request finish to be executed on IO loop (to make sure that it is done on same thread where IO loop is being executed).

This works. At least it looks like it does.

My problem here is performance regarding max workers in ThreadPoolExecutor.

All handlers are IO bound, there is no computation going on (they are mostly waiting for DB or external services), so with 50 workers I would expect 50 concurent requests to finish approximately 50 times faster then 50 concurent requests with only one worker.

But that is not the case. What I see is almost identical requests per second when I have 50 workers in thread pool and 1 worker.

For measuring, I have used Apache-Bench with something like:

ab -n 100 -c 10 http://localhost:1111/some_url

Does anybody have idea what am I doing wrong? Did I misunderstand how Tornado or ThreadPool works? Or combination?

Upvotes: 14

Views: 3595

Answers (1)

J_H
J_H

Reputation: 20450

The momoko wrapper for postgres remedies this issue, as suggested by kwarunek. If you want to solicit further debugging advice from outside collaborators, it would help to post timestamped debug logs from a test task that does sleep(10) before each DB access.

Upvotes: 1

Related Questions