Tom
Tom

Reputation: 621

Use trio nursery as a generator for Sever Sent Events with FastAPI?

I'm trying to build a Server-Sent Events endpoint with FastAPI but I'm unsure if what I'm trying to accomplish is possible or how I would go about doing it.

Introduction to the problem

Basically let's say I have a run_task(limit, task) async function that sends an async request, makes a transaction, or something similar. Let's say that for each task run_task can return some JSON data.

I'd like to run multiple tasks (multiple run_task(limit, task)) asynchronously, to do so I'm using trio and nurseries like so:

async with trio.open_nursery() as nursery:
    limit = trio.CapacityLimiter(10)
    for task in tasks:
        nursery.start_soon(run_task, limit, task)

And finally, I want to return the results of each task via a FastAPI endpoint

At first, I simply created an object containing a list, and passed that object (by reference) to each run_task, when a task was finished I'd push the JSON data as a dictionary, and return the whole object via the endpoint once all the tasks were finished.

This works, but I find it inefficient, the client sending the request needs to wait for all the tasks to finish before it can display the data, however, some tasks can be quite slow, meaning the data fetched from other tasks just ends up stagnating.

What I would like to accomplish

Whenever a task is finished, I'd want the API to directly return the data of said task (that I would've previously added to the object) so that the client can display said data in real-time.

That's when I discovered what Server-Sent Events and Web-sockets were. Server sent events seemed like the appropriate solution to my problem, as I don't need bidirectional communication.

Since FastAPI is built on Starlette, I decided to use sse-Starlette to build an endpoint with server-sent events, to do so I need to build an endpoint like so

@router.get('/stream')
async def runTasks(
        param1: str,
        request: Request
):
    event_generator = status_event_generator(request, param1)
    return EventSourceResponse(event_generator)

The actual problem

As the name status_event_generator implies, sse-starlette needs to return an event generator, and that's where I'm kind of stuck. I'd want the generator to yield the data of a task when it finishes (so that the client can receive the data of each task in real-time), however, the tasks are within the async trio nursery so I'm unsure how to proceed

As per Is yielding from inside a nursery in an asynchronous generator function bad?, it seems (if I understand correctly) that I can't just put a yield in run_task(limit, task) and expect it to work

Upvotes: 3

Views: 876

Answers (1)

Tom
Tom

Reputation: 621

Solution with websockets

I decided to ultimately go with websockets rather than SSE, as I realised I needed to pass an object as data to my endpoint, and while SEE can accept query params, dealing with objects as query parameters was too much of a hassle.

websockets with FastAPI are based on starlette, and are pretty easy to use, implementing them to the problem above can be done like so:

@router.websocket('/stream')
async def runTasks(
        websocket: WebSocket
):
    # Initialise websocket
    await websocket.accept()

    # Receive data
    tasks = await websocket.receive_json()

    async with trio.open_nursery() as nursery:
        limit = trio.CapacityLimiter(10)
        for task in tasks:
            nursery.start_soon(run_task, limit, task, websocket)

To return data we can then simply use await websocket.send_json() in run_task (This is a simplified example, you'd preferably want to handle websocket closures and edge-cases with your nursery)

Solution with SSE

To answer the original problem, thanks to @user3840170 and https://discuss.python.org/t/preventing-yield-inside-certain-context-managers/1091, we should be able to solve the problem by opening a nursery somewhere in a wider scope that will contain the loop that goes over the generator, and use that nursery in the generator itself to spawn background tasks.

Upvotes: 2

Related Questions