Ram
Ram

Reputation: 748

python aiohttp behaving like single threaded application

I have the following simple python web server application. When I trigger /sleep call - until the time for sleep ends and the response returns - all other calls on /quick are blocked. I am not sure what it wrong with this code. Can someone provide some clarity?

from aiohttp import web
import asyncio
import time

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

async def sleephandle(request):
    name = request.match_info.get('name', "Anonymous")
    time.sleep(12) // trivializing here; actual code has a transition from async to sync 
    text = "Hello, " + name
    return web.Response(text=text)

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_get('/quick', handle)
    app.router.add_get('/sleep', sleephandle)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('server started')
    return srv

def create_server():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()

create_server()

Upvotes: 0

Views: 371

Answers (1)

Artiom  Kozyrev
Artiom Kozyrev

Reputation: 3846

The key idea of my solution is to use loop.run_in_executor with correct for your case Pool. You can solve the problem the following way:

from aiohttp import web
import asyncio
import time
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime


def blocking_code():
    """Some long running code"""
    time.sleep(12)
    return "!!!!"


async def blocking_code_task(loop: asyncio.BaseEventLoop, request: web.Request):
    """Wrapper to be used in asyncio Task"""
    r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
    logging.info(f"{datetime.now()}: {r}")


async def handle(request: web.Request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)


async def sleephandle(request: web.Request):
    """We wait fore results here, then send response"""
    name = request.match_info.get('name', "Anonymous")
    loop = asyncio.get_event_loop()
    # if you want to wait for result
    r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
    text = "Hello, " + name + r
    return web.Response(text=text)


async def fast_sleep_answer(request: web.Request):
    """We send response as fast as possible and do all work in another asyncio Task"""
    name = request.match_info.get('name', "Anonymous")
    loop = asyncio.get_event_loop()
    # if you do not want to want for result
    asyncio.create_task(blocking_code_task(loop, request))
    text = "Fast answer" + name
    return web.Response(text=text)


async def on_shutdown(app):
    """Do not forget to correctly close ThreadPool"""
    app["workers_pool"].shutdown()
    logging.info(f"{datetime.now()}: Pool is closed")


async def init(args=None):
    """Changed your code for newer aiohttp"""
    pool = ThreadPoolExecutor(8)
    app = web.Application()
    app.router.add_get('/quick', handle)
    app.router.add_get('/sleep', sleephandle)
    app.router.add_get('/fast', fast_sleep_answer)
    app["workers_pool"] = pool  # can be ThreadPool or ProcessPool
    app.on_shutdown.append(on_shutdown)  # close the pool when app closes
    return app

# the better way to tun app
# name of file is x.py
# in Linux command will be python3
# python -m aiohttp.web -H 0.0.0.0 -P 8080 x:init
if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    web.run_app(init(), host="0.0.0.0", port=8080)

All blocking IN/OUT ops are made in ThreadPoolExecutor. If your tasks are CPU bound go with ProcessPoolExecutor. I showed two cases: 1) when you can't answer as fast as possible and need to wait for results 2) when you can just answer and then make all work in background.

Upvotes: 1

Related Questions