ScubaChris
ScubaChris

Reputation: 198

FastAPI, add long tasks to buffer and process them one by one, while maintaining server responsiveness

I am trying to set up a FastAPI server that will take as input some biological data, and run some processing on them. Since the processing takes up all the server's resources, queries should be processed sequentially. However, the server should stay responsive and add further requests in a buffer. I've been trying to use the BackgroundTasks module for this, but after sending the second query, the response gets delayed while the task is running. Any help appreciated, and thanks in advance.

import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()

@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        os.makedirs(self.experiment_dir, exist_ok=False)

    def run(self):
        self.status = "running"
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process, query)
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')

@app.get("/backlog/")
def return_backlog():
    return {f"Currently {len(QUERY_BUFFER)} jobs in the backlog."}

Upvotes: 10

Views: 9279

Answers (2)

Ben
Ben

Reputation: 2027

EDIT:

The original answer was influenced by testing with httpx.AsyncClient (as flagged might be the case in the original caveat). The test client causes background tasks to block that do not block without the test client. As such, there's a simpler solution provided you don't want to test it with httpx.AsyncClient. The new solution uses uvicorn and then I tested this manually with Postman instead.

This solution uses a function as the background task (process) so that it runs outside the main thread. It then schedules a job to run aprocess which will run in the main thread when the event loop gets a chance. The aprocess coroutine is able to then await the run coroutine of your Query as before.

Additionally, I've added a time.sleep(10) to the process function to illustrate that even long running non-IO tasks will not prevent your original HTTP session from sending a response back to the client (although this will only work if it is something that releases the GIL. If it's CPU bound though you might want a separate process altogether by using multiprocessing or a separate service). Finally, I've replaced the prints with logging so that they work along with the uvicorn logging.

import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
import logging


logging.basicConfig(level=logging.INFO, format="%(levelname)-9s %(asctime)s - %(name)s - %(message)s")
LOGGER = logging.getLogger(__name__)

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()
loop = asyncio.get_event_loop()

@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        # os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing

    async def run(self):
        self.status = "running"
        await asyncio.sleep(5)  # simulate long running query
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process, query)
    LOGGER.info(f'root - added task')
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}


def process(query):
    """ Schedule processing of query, and then run some long running non-IO job without blocking the app"""
    asyncio.run_coroutine_threadsafe(aprocess(query), loop)
    LOGGER.info(f"process - {query.experiment_id} - Submitted query job. Now run non-IO work for 10 seconds...")
    time.sleep(10) # simulate long running non-IO work, does not block app as this is in another thread - provided it is not cpu bound.
    LOGGER.info(f'process - {query.experiment_id} - wake up!')


async def aprocess(query):
    """ Process query and generate data """
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    LOGGER.info(f'aprocess - Query {query.experiment_id} processing finished with return code {ret_code}.')


@app.get("/backlog/")
def return_backlog():
    return {f"return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run("scratch_26:app", host="127.0.0.1", port=8000)

ORIGINAL ANSWER:

*A caveat on this answer - I've tried testing this with `httpx.AsyncClient`, which might account for different behavior compared to deploying behind guvicorn.*

From what I can tell (and I am very open to correction on this), BackgroundTasks actually need to complete prior to an HTTP response being sent. This is not what the Starlette docs or the FastAPI docs say, but it appears to be the case, at least while using the httpx AsyncClient.

Whether you add a a coroutine (which is executed in the main thread) or a function (which gets executed in it's own side thread) that HTTP response is blocked from being sent until the background task is complete.

If you want to await a long running (asyncio friendly) task, you can get around this problem by using a wrapper function. The wrapper function adds the real task (a coroutine, since it will be using await) to the event loop and then returns. Since this is very fast, the fact that it "blocks" no longer matters (assuming a few milliseconds doesn't matter).

The real task then gets executed in turn (but after the initial HTTP response has been sent), and although it's on the main thread, the asyncio part of the function will not block.

You could try this:

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    ...
    background_tasks.add_task(process_wrapper, query)
    ...

async def process_wrapper(query):
    loop = asyncio.get_event_loop()
    loop.create_task(process(query))

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')

Note also that you'll also need to make your run() function a coroutine by adding the async keyword since you're expecting to await it from your process() function.

Here's a full working example that uses httpx.AsyncClient to test it. I've added the fmt_duration helper function to show the lapsed time for illustrative purposes. I've also commented out the code that creates directories, and simulated a 2 second query duration in the run() function.

import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
from httpx import AsyncClient

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()
start_ts = time.time()


@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        # os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing

    async def run(self):
        self.status = "running"
        await asyncio.sleep(2)  # simulate long running query
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process_wrapper, query)
    print(f'{fmt_duration()} - root - added task')
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}


async def process_wrapper(query):
    loop = asyncio.get_event_loop()
    loop.create_task(process(query))

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'{fmt_duration()} - process - Query {query.experiment_id} processing finished with return code {ret_code}.')

@app.get("/backlog/")
def return_backlog():
    return {f"{fmt_duration()} - return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}


async def test_me():
    async with AsyncClient(app=app, base_url="http://example") as ac:
        res = await ac.post("/", content="query_name=foo&query_sequence=42")
        print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
        res = await ac.post("/", content="query_name=bar&query_sequence=43")
        print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
        content = ""
        while not content.endswith('0 jobs in the backlog."]'):
            await asyncio.sleep(1)
            backlog_results = await ac.get("/backlog")
            content = backlog_results.content.decode("utf8")
            print(f"{fmt_duration()} - test_me - content: {content}")


def fmt_duration():
    return f"Progress time: {time.time() - start_ts:.3f}s"

loop = asyncio.get_event_loop()
print(f'starting loop...')
loop.run_until_complete(test_me())
duration = time.time() - start_ts
print(f'Finished. Duration: {duration:.3f} seconds.')

in my local environment if I run the above I get this output:

starting loop...
Progress time: 0.005s - root - added task
Progress time: 0.006s - [200] - {"Query created":{"query_name":"foo","query_sequence":"42","experiment_id":"1627489235.9300923","status":"pending","experiment_dir":"/experiments/1627489235.9300923"},"Query ID":"1627489235.9300923","Backlog Length":1}
Progress time: 0.007s - root - added task
Progress time: 0.009s - [200] - {"Query created":{"query_name":"bar","query_sequence":"43","experiment_id":"1627489235.932097","status":"pending","experiment_dir":"/experiments/1627489235.932097"},"Query ID":"1627489235.932097","Backlog Length":2}
Progress time: 1.016s - test_me - content: ["Progress time: 1.015s - return_backlog - Currently 2 jobs in the backlog."]
Progress time: 2.008s - process - Query 1627489235.9300923 processing finished with return code 0.
Progress time: 2.008s - process - Query 1627489235.932097 processing finished with return code 0.
Progress time: 2.041s - test_me - content: ["Progress time: 2.041s - return_backlog - Currently 0 jobs in the backlog."]
Finished. Duration: 2.041 seconds.

I also tried making process_wrapper a function so that Starlette executes it in a new thread. This works the same way, just use run_coroutine_threadsafe instead of create_task i.e.

def process_wrapper(query):
    loop = asyncio.get_event_loop()
    asyncio.run_coroutine_threadsafe(process(query), loop)

If there is some other way to get a background task to run without blocking the HTTP response I'd love to find out how, but absent that this wrapper solution should work.

Upvotes: 6

GwynBleidD
GwynBleidD

Reputation: 20539

I think your issue is in the task you want to run, not in the BackgroundTask itself.

FastAPI (and underlying Starlette, which is responsible for running the background tasks) is created on top of the asyncio and handles all requests asynchronously. That means, if one request is being processed, if there is any IO operation while processing the current request, and that IO operation supports the asynchronous approach, FastAPI will switch to the next request in queue while this IO operation is pending.

Same goes for any background tasks added to the queue. If background task is pending, any requests or other background tasks will be handled only when FastAPI is waiting for any IO operation.

As you may see, this is not ideal when either your view or task doesn't have any IO operations or they cannot be run asynchronously. There is a workaround for that situation:

  • declare your views or tasks as normal, non asynchronous functions
    Starlette will then run those views in a separate thread, outside of the main async loop, so other requests can be handled at the same time
  • manually run the part of your logic that may block the processing of other requests using asgiref.sync_to_async
    This will also cause this logic to be executed in a separate thread, releasing the main async loop to take care of other requests until the function returns.

If you are not doing any asynchronous IO operations in your long-running task, the first approach will be most suitable for you. Otherwise, you should take any part of your code that is either long-running or performs any non-asynchronous IO operations and wrap it with sync_to_async.

Upvotes: 1

Related Questions