Semisonic
Semisonic

Reputation: 1173

asyncio.wait_for doesn't time out as expected

I'm debugging a FastAPI application, and I have a problem similar to what's been mentioned in this post: a call to asyncio.wait_for that should time out doesn't:

try:
    await wait_for(completion_event.wait(), 1.0)
except TimeoutError:
    logging.info("timeout")
    return SubmissionResult(post_id=post_id, language_check_pending=True)

This snippet is a part of a FastAPI's POST request handler. Here, completion_event is an asyncio.Event object. I can put a breakpoint on the line with wait_for, watch it get stuck for much more than 1s, and then move right past the except block. There's no doubt that wait_for doesn't do what it's expected to do.

I have no idea why it behaves like that. At this point, I'm starting to suspect the FastAPI's internals, since it uses uvloop as a "faster drop-in replacement for asyncio". But I don't know how to test this assumption, much less to fix this problem if it's indeed the case.

Any suggestions?

Upvotes: 5

Views: 7672

Answers (2)

Patrullero Matt
Patrullero Matt

Reputation: 11

I think this can be fixed using asyncio.timeout_at (present in Python 3.11) instead asyncio.wait_for.

I'm using python 3.9, so, as a workaround, I've created a decorator to use in the endpoints you want to raise a response 504: (place it in a file named abort_after.py)

import functools
import signal
import sys

from fastapi.responses import JSONResponse
from starlette import status


class TimeOutException(Exception):
    """It took longer than expected"""


def abort_after(max_execution_time):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            def handle_timeout(signum, frame):
                raise TimeOutException(f"Function execution took longer than {max_execution_time}s and was terminated")
            if sys.platform == 'win32':
                print("Won't be stopped in windows!")
            else:
                signal.signal(signal.SIGALRM, handle_timeout)
                signal.alarm(max_execution_time)
            result = func(*args, **kwargs)
            if sys.platform != 'win32':
                signal.alarm(0)
            return result
        return wrapper
    return decorator


def timeout_response() -> JSONResponse:
    return JSONResponse(
        {
            'detail': 'Request processing time excedeed limit',
        },
        status_code=status.HTTP_504_GATEWAY_TIMEOUT,
    )

Then you can use it in your endpoint:

import time
from fastapi import APIRouter
from abort_after import abort_after, TimeOutException, timeout_response

router = APIRouter()


@router.post(f"{URL_prefix}/test",
             tags=['Test'],
             )
async def test():
    try:
        long_func(60)
    except TimeOutException:
        return timeout_response()
    return {'Test': 'ok'}


@abort_after(5)
def long_func(seconds: int) -> None:
    time.sleep(seconds)

Upvotes: 1

Paul Cornelius
Paul Cornelius

Reputation: 10906

A possible explanation is that completion_event.wait() is raising an exception before the time delay has elapsed. A cancellation of the enclosing Task would count as an Exception. The following program illustrates this:

import asyncio

async def fail():
    await asyncio.sleep(0.5)
    # raise ValueError("Half a second")
    await asyncio.sleep(0.7)
    
async def cancel_point_three(task):
    await asyncio.sleep(0.3)
    task.cancel()
    
async def main():
    task = asyncio.create_task(fail())
    # asyncio.create_task(cancel_point_three(task))
        
    try:
        await asyncio.wait_for(task, 1.0)
    except asyncio.TimeoutError:
        print("Timed out")
    else:
        print("Did not time out")
        
asyncio.run(main())

Run this program as is and it will print "Timed out" as expected. Remove the comment in front of raise ValueError and the code drops all the way through to the end, without printing "Timed out". The Exception occurs before the timeout.

A similar thing happens if you instead remove the comment in front of asyncio.create_task(cancel_point_three(task)). There is no timeout, even though the method fail() did not raise any Exception.

Also note that there are two TimeoutError classes in the standard library. One of them is asyncio.TimeoutError, which is the one raised by asyncio.wait_for. There is also a TimeoutError in the basic exception heirarchy - it's a subclass of OSError and is not the one you want here.

Upvotes: 2

Related Questions