Reputation: 2605
I have implemented all my routes using async. And have followed all guidelines from the FastAPI documentation.
Every route has multiple DB calls, which does not have async support, so they are normal function like this
def db_fetch(query):
# I take a few seconds to respond
return
To avoid blocking my event loop I use fastapi.concurrancy.run_in_threadpool
Now the issue is, when a large number of requests come, my new requests are getting blocked. Even if I close the browser tab (cancel request), the entire app gets stuck till the older requests get processed.
What am I doing wrong here?
I use uvicorn
as my ASGI server. I run in a kubernetes cluster with 2 replica.
Few suspects: Am I spawning too many threads? Is it some bug within uvicron? Not really sure!
Upvotes: 9
Views: 16922
Reputation: 1
import threading
import time
from anyio.to_thread import current_default_thread_limiter
from fastapi import FastAPI
from starlette.concurrency import run_in_threadpool
app = FastAPI()
current_default_thread_limiter().total_tokens = 1000
def test():
print(threading.current_thread().native_id)
time.sleep(10)
return 'ok'
@app.get("/")
async def read_root():
result = await run_in_threadpool(func=test)
print(result)
return {"Hello": "World"}
Upvotes: -1
Reputation: 4539
It is as you've said an issue with too many threads. Under the hood, fastapi uses starlette which in turn uses anyio's to_thread.run_sync
. As described here, too many threads could lead to an issue and you could shield them using a semaphore to set an upper bound on the maximum threads created. In code, that would read roughly like
# Core Library
from typing import TypeVar, Callable
from typing_extensions import ParamSpec
# Third party
from anyio import Semaphore
from starlette.concurrency import run_in_threadpool
# To not have too many threads running (which could happen on too many concurrent
# requests, we limit it with a semaphore.
MAX_CONCURRENT_THREADS = 10
MAX_THREADS_GUARD = Semaphore(MAX_CONCURRENT_THREADS)
T = TypeVar("T")
P = ParamSpec("P")
async def run_async(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
async with MAX_THREADS_GUARD:
return await run_in_threadpool(func, *args, **kwargs)
Upvotes: 10