bcherb2
bcherb2

Reputation: 11

RuntimeError: "Timeout context manager should be used inside a task" with pytest-asyncio but works in direct Python execution

Using asyncio and aiohttp in an application I'm building and can't seem to figure out how to get pytest playing nicely. When I use pytest I always get a

RuntimeError: Timeout context manager should be used inside a task

If I do the same functions that pytest is calling just in main(), the problem seems to go away. I uploaded a repo to easily reproduce at https://github.com/bcherb2/async_bug

I have tried just about every solution and hack I can find, and nothing seems to work (nest_asyncio, pytest plugins, etc.)

Here is the failing code:

#api_client.py
import aiohttp
import uuid
import json
from enum import Enum
from typing import Optional, Dict, Any
from loguru import logger

class RetCode(Enum):
    NO_ERROR = 200
    BAD_REQUEST = 400
    UNAUTHORIZED = 401
    NOT_FOUND = 404


class DemoAPIClient:
    """Demo REST client that simulates behavior similar to ANTServerRESTClient."""
    
    def __init__(
        self,
        base_url: str = "https://jsonplaceholder.typicode.com",
        timeout: int = 30
    ):
        """Initialize the API client.
        
        Args:
            base_url: Base URL for the API
            timeout: Request timeout in seconds
        """
        self.base_url = base_url
        self.timeout = timeout
        
        # Session management
        self._session: Optional[aiohttp.ClientSession] = None
        self._session_token: Optional[str] = None

    async def _ensure_session(self) -> aiohttp.ClientSession:
        """Ensure we have an active session, creating one if necessary."""
        if self._session is None or self._session.closed:
            connector = aiohttp.TCPConnector(force_close=True)  
            self._session = aiohttp.ClientSession(
                connector=connector,
                timeout=aiohttp.ClientTimeout(total=self.timeout)
            )
        return self._session

    async def close(self) -> None:
        """Close the client session."""
        if self._session:
            await self._session.close()
            self._session = None
            logger.debug("Session closed")

    async def login(self) -> None:
        """Simulate login by making a test request."""
        try:

            test_url = f"{self.base_url}/posts/1"
            session = await self._ensure_session()
            
            async with session.get(test_url) as response:
                if response.status != 200:
                    raise aiohttp.ClientResponseError(
                        request_info=response.request_info,
                        history=response.history,
                        status=response.status,
                        message=f"Login failed with status {response.status}"
                    )
                
                # Simulate session token
                self._session_token = str(uuid.uuid4())
                logger.info("Successfully logged in to API")
                
        except Exception as e:
            logger.error(f"Login failed: {str(e)}")
            raise

    async def rest(
        self,
        endpoint: str,
        method: str,
        data: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Execute a REST request.
        
        Args:
            endpoint: The endpoint path (e.g., '/posts')
            method: HTTP method (GET, POST, etc.)
            data: Optional request body data
            
        Returns:
            Dict containing the parsed response data
        """
        if not self._session_token:
            raise RuntimeError("Not logged in. Call login() first")
            
        session = await self._ensure_session()
        request_id = str(uuid.uuid4())[:8]
        url = f"{self.base_url}{endpoint}"

        try:
            logger.debug(f"[{request_id}] {method} {url}")
            if data:
                logger.debug(f"[{request_id}] Request body: {data}")

            headers = {"Authorization": f"Bearer {self._session_token}"}
            
            async with session.request(
                method=method,
                url=url,
                json=data,
                headers=headers
            ) as response:
                response_text = await response.text()
                logger.debug(f"[{request_id}] Response: {response_text}")
                
                if response.status >= 400:
                    raise aiohttp.ClientResponseError(
                        request_info=response.request_info,
                        history=response.history,
                        status=response.status,
                        message=f"Request failed: {response_text}"
                    )
                    
                return json.loads(response_text)

        except Exception as e:
            logger.error(f"[{request_id}] Request failed: {str(e)}")
            raise

#conftest.py

import pytest_asyncio
from loguru import logger
from api_client import DemoAPIClient

def pytest_configure(config):
    config.option.asyncio_mode = "auto"


@pytest_asyncio.fixture(scope="module")
async def api_client():
    """Fixture to provide an authenticated API client."""
    logger.info("Setting up API client")
    client = DemoAPIClient()
    
    try:
        await client.login()
        logger.info("API client logged in successfully")
        yield client
    finally:
        await client.close()
        logger.info("API client closed")


#test_api_client.py

import pytest
import asyncio
from loguru import logger
from api_client import DemoAPIClient


async def ensure_task_context():
    """Helper to ensure we're in a task context."""
    if asyncio.current_task() is None:
        task = asyncio.create_task(asyncio.sleep(0))
        await task


@pytest.mark.asyncio
async def test_client_setup(api_client):
    """Test basic client setup."""
    logger.debug("Testing client setup")
    assert api_client._session_token is not None
    assert api_client._session is not None
    logger.debug("Client setup verified")


@pytest.mark.asyncio
async def test_get_post(api_client):
    """Test retrieving a post."""
    await ensure_task_context()  # Try to ensure task context
    
    try:
        response = await api_client.rest("/posts/1", "GET")
        assert response is not None
        assert "id" in response
        assert response["id"] == 1
    except Exception as e:
        logger.error(f"Test failed: {str(e)}")
        raise


@pytest.mark.asyncio
async def test_create_post(api_client):
    """Test creating a new post."""
    await ensure_task_context()  # Try to ensure task context
    
    try:
        new_post = {
            "title": "Test Post",
            "body": "Test Content",
            "userId": 1
        }
        response = await api_client.rest("/posts", "POST", new_post)
        assert response is not None
        assert "id" in response
        assert response["title"] == "Test Post"
    except Exception as e:
        logger.error(f"Test failed: {str(e)}")
        raise


async def main():
    """Main function to run tests directly without pytest."""
    logger.info("Starting direct test execution")

    client = DemoAPIClient()
    
    try:
        await client.login()
        logger.info("Client logged in")
        
        logger.info("Running test_client_setup")
        await test_client_setup(client)
        logger.info("Client setup test passed")
        
        logger.info("Running test_get_post")
        await test_get_post(client)
        logger.info("Get post test passed")
        
        logger.info("Running test_create_post")
        await test_create_post(client)
        logger.info("Create post test passed")
        
    except Exception as e:
        logger.error(f"Test execution failed: {str(e)}")
        raise
    finally:
        logger.info("Cleaning up client")
        await client.close()
        logger.info("Client closed")


if __name__ == "__main__":
    asyncio.run(main())

full trace of one of the failing tests:

________________________________________________________________________________________ test_create_post _________________________________________________________________________________________

api_client = <api_client.DemoAPIClient object at 0x7eeac5875a90>

    @pytest.mark.asyncio
    async def test_create_post(api_client):
        """Test creating a new post."""
        await ensure_task_context()  # Try to ensure task context
    
        try:
            new_post = {
                "title": "Test Post",
                "body": "Test Content",
                "userId": 1
            }
>           response = await api_client.rest("/posts", "POST", new_post)

test_api_client.py:51: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
api_client.py:110: in rest
    async with session.request(
.venv/lib/python3.12/site-packages/aiohttp/client.py:1425: in __aenter__
    self._resp: _RetType = await self._coro
.venv/lib/python3.12/site-packages/aiohttp/client.py:607: in _request
    with timer:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <aiohttp.helpers.TimerContext object at 0x7eeac43b19f0>

    def __enter__(self) -> BaseTimerContext:
        task = asyncio.current_task(loop=self._loop)
        if task is None:
>           raise RuntimeError("Timeout context manager should be used inside a task")
E           RuntimeError: Timeout context manager should be used inside a task

.venv/lib/python3.12/site-packages/aiohttp/helpers.py:636: RuntimeError

then just run pytest test_api_client.py and python test_api_client.py. Why is this failing? Is there any way to fix this?

Upvotes: 1

Views: 83

Answers (0)

Related Questions