Reputation: 1008
I want to run a simple background task in FastAPI, which involves some computation before dumping it into the database. However, the computation would block it from receiving any more requests.
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
db = Database()
async def task(data):
otherdata = await db.fetch("some sql")
newdata = somelongcomputation(data,otherdata) # this blocks other requests
await db.execute("some sql",newdata)
@app.post("/profile")
async def profile(data: Data, background_tasks: BackgroundTasks):
background_tasks.add_task(task, data)
return {}
What is the best way to solve this issue?
Upvotes: 69
Views: 81433
Reputation: 2456
This is a example of Background Task To FastAPI
import asyncio
from fastapi import FastAPI
app = FastAPI()
x = [1] # a global variable x
@app.get("/")
def hello():
return {"message": "hello", "x": x}
async def periodic():
while True:
# code to run periodically starts here
x[0] += 1
print(f"x is now {x}")
# code to run periodically ends here
# sleep for 3 seconds after running above code
await asyncio.sleep(3)
@app.on_event("startup")
async def schedule_periodic():
loop = asyncio.get_event_loop()
loop.create_task(periodic())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app)
Upvotes: 1
Reputation: 469
If your task is CPU bound you could use multiprocessing, there is way to do that with Background task in FastAPI: https://stackoverflow.com/a/63171013
Although you should consider to use something like Celery if there are lot of cpu-heavy tasks.
Upvotes: 1
Reputation: 3846
Your task
is defined as async
, which means fastapi (or rather starlette) will run it in the asyncio event loop.
And because somelongcomputation
is synchronous (i.e. not waiting on some IO, but doing computation) it will block the event loop as long as it is running.
I see a few ways of solving this:
Use more workers (e.g. uvicorn main:app --workers 4
). This will allow up to 4 somelongcomputation
in parallel.
Rewrite your task to not be async
(i.e. define it as def task(data): ...
etc). Then starlette will run it in a separate thread.
Use fastapi.concurrency.run_in_threadpool
, which will also run it in a separate thread. Like so:
from fastapi.concurrency import run_in_threadpool
async def task(data):
otherdata = await db.fetch("some sql")
newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)
asyncios
's run_in_executor
directly (which run_in_threadpool
uses under the hood):
import asyncio
async def task(data):
otherdata = await db.fetch("some sql")
loop = asyncio.get_running_loop()
newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)
You could even pass in a concurrent.futures.ProcessPoolExecutor
as the first argument to run_in_executor
to run it in a separate process.Spawn a separate thread / process yourself. E.g. using concurrent.futures
.
Use something more heavy-handed like celery. (Also mentioned in the fastapi docs here).
Upvotes: 114
Reputation: 593
Read this issue.
Also in the example below, my_model.function_b
could be any blocking function or process.
TL;DR
from starlette.concurrency import run_in_threadpool
@app.get("/long_answer")
async def long_answer():
rst = await run_in_threadpool(my_model.function_b, arg_1, arg_2)
return rst
Upvotes: 1