Reputation: 111
I'm implementing REST api with tornado, and want it to be non-blocking.
Currently, issue-related code goes like this:
class ReprsHandler(web.RequestHandler):
async def get(self, name):
db = await dbf.create_handler()
if 'id' in list(self.request.query_arguments.keys()):
db_future = asyncio.ensure_future(db.get_repr(name, self.get_query_argument('id')))
else:
db_future = asyncio.ensure_future(db.get_reprs(name))
result = await db_future
response = result.toSerializedStream()
self.set_status(HTTPStatus.OK)
self.write(response)
self.set_header('Content-Type', 'text/plain')
self.finish()
class App(object):
def __init__(self, loop):
self.server_app = web.Application(
handlers=[
(r"/api/v1/([a-zA-Z0-9_-]+)/reprs", ReprsHandler),
]
)
def main():
AsyncIOMainLoop().install()
loop = asyncio.get_event_loop()
app = App(loop)
server = tornado.httpserver.HTTPServer(app.server_app, max_body_size=config['max_upload_size'], max_buffer_size=config['max_upload_size'])
server.bind(config['server_port'])
server.start()
loop.run_forever()
Simple code, but the data is quite large, so that it takes about 3~4 minutes to send all.
I expected handler's logic and networking IO both to be non-blocking, but it blocks the server network while sending data as response. logic is fine. They don't block other requests.
Details:
What could be a problem? I have no idea whats making this problem.
Upvotes: 2
Views: 217
Reputation: 21744
Since you mentioned that result.toSerializedStream()
pickles the data. So, yeah, you're right that the blocking is because of network io.
To avoid that, you can send your data in chunks and call self.flush()
after every self.write()
. Calling flush
will write the response to the network. Since you can await
on flush
, until the data is written to network socket, the coroutine will pause, and the server won't block. This allows other handlers to run asynchronously.
A code sample:
async def get(self, name):
...
response = result.toSerializedStream()
chunk_size = 1024 * 1024 * 10 # 10 MiB
start_byte = 0
while True:
chunk = response[start_byte : start_byte + chunk_size]
if not chunk:
break
self.write(chunk)
await self.flush() # wait while data is flushed to network
start_byte += chunk_size # move start_byte forward
Important:
One important thing to note here is that self.flush()
is pretty fast. If you're flushing small data to network, the await
delay is so small that the coroutine keeps running without pause, thus blocking the server.
In the sample code above, I've set the chunk_size
to 10 MiB
, but if your computer is fast, the await
delay will be very, very small and the loop might run without pause until the whole data is sent.
I encourage you to increase or decrease the chunk_size
value as per your needs.
Further improvement suggestions:
All the data is in memory. Now that your handler is asynchronous and doesn't block, if another request comes in at ReprsHandler
, this will lead to more data storage in memory. And if more and more requests come in, then you can tell what's going to happen.
To avoid this, instead of pickling the data in memory, you can dump it to a file. Then in your handler just open
that file and read it in chunks and send it.
Upvotes: 1
Reputation: 111
OK. This question was so stupid.
I was expecting this non-blocking API to work as parallel networking, so that whole networking won't interrupt each others. And it's not what tornado is designed to be. It's non-blocking obviously, but still single threaded.
Upvotes: 0