Guillermo
Guillermo

Reputation: 61

Exception closing connection using sqlalchemy with asyncio and postgresql

I have an API server using Python 3.7.10. I am using the FastAPI framework with sqlalchemy, asyncio, psycopg2-binary, asyncpg along with postgresql. I am deploying this using aws elasticbeanstalk. The application seems to work fine but everytime my frontend calls an endpoint, it seems like the connection is not closing correctly.

Error

Jun  1 21:17:33 web: ERROR:sqlalchemy.pool.impl.AsyncAdaptedQueuePool:Exception closing connection <AdaptedConnection <asyncpg.connection.Connection object at 0x7fd8b005cb90>>
Jun  1 21:17:33 web: Traceback (most recent call last):
Jun  1 21:17:33 web: File "/var/app/venv/staging-LQM1lest/lib64/python3.7/site-packages/sqlalchemy/pool/base.py", line 247, in _close_connection
Jun  1 21:17:33 web: self._dialect.do_close(connection)
Jun  1 21:17:33 web: File "/var/app/venv/staging-LQM1lest/lib64/python3.7/site-packages/sqlalchemy/engine/default.py", line 688, in do_close
Jun  1 21:17:33 web: dbapi_connection.close()
Jun  1 21:17:33 web: File "/var/app/venv/staging-LQM1lest/lib64/python3.7/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 749, in close
Jun  1 21:17:33 web: self.await_(self._connection.close())
Jun  1 21:17:33 web: File "/var/app/venv/staging-LQM1lest/lib64/python3.7/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
Jun  1 21:17:33 web: return current.driver.switch(awaitable)
Jun  1 21:17:33 web: File "/var/app/venv/staging-LQM1lest/lib64/python3.7/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
Jun  1 21:17:33 web: value = await result
Jun  1 21:17:33 web: File "/var/app/venv/staging-LQM1lest/lib64/python3.7/site-packages/asyncpg/connection.py", line 1334, in close
Jun  1 21:17:33 web: await self._protocol.close(timeout)
Jun  1 21:17:33 web: File "asyncpg/protocol/protocol.pyx", line 581, in close
Jun  1 21:17:33 web: concurrent.futures._base.CancelledError

Here is my setup for the engine and session:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from app.model.base import CustomBase
from app.core.config import SQLALCHEMY_DATABASE_URI

engine = create_async_engine(SQLALCHEMY_DATABASE_URI)

SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    class_=AsyncSession,
    bind=engine,
    expire_on_commit=False,
)

I am using FastAPI's dependency injection to get the session with the following:

async def get_db() -> AsyncSession:
    async with SessionLocal() as session:
        yield session

This error only shows up in my deployment and not my local environment, and seems to only when using sqlalchemy asynchronously with asyncio. Thanks for the help!

Upvotes: 5

Views: 4222

Answers (4)

GodVov4
GodVov4

Reputation: 1

You need to choose the pool that is most suitable for your project. More pools here

NullPool

A Pool which does not pool connections.

Instead it literally opens and closes the underlying DB-API connection per each connection open/close.

Reconnect-related functions such as recycle and connection invalidation are not supported by this Pool implementation, since no connections are held persistently.

StaticPool

A Pool of exactly one connection, used for all requests.

Reconnect-related functions such as recycle and connection invalidation (which is also used to support auto-reconnect) are only partially supported right now and may not yield good results.

This pool was perfect for me
from sqlalchemy.pool import StaticPool
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from app.model.base import CustomBase
from app.core.config import SQLALCHEMY_DATABASE_URI

engine = create_async_engine(
    SQLALCHEMY_DATABASE_URI, pool_pre_ping=True, poolclass=StaticPool
)

SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    class_=AsyncSession,
    bind=engine,
    expire_on_commit=False,
)

Upvotes: 0

KObb
KObb

Reputation: 71

It's quite an old post, but just in case anyone would go around this...

You are yielding sessions, but you are not closing them. If you use the get_db() as dependency, FastAPI will take care to execute the code after the yield (at the end of the request lifetime), so if you do something like this:

async def get_db() -> AsyncSession:
    async with SessionLocal() as session:
        yield session
        session.close()

it will close (return to the pool) the connection for you.

Upvotes: 1

Guillermo
Guillermo

Reputation: 61

My problem was fixed by using NullPool class.

from sqlalchemy.pool import NullPool
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from app.model.base import CustomBase
from app.core.config import SQLALCHEMY_DATABASE_URI

engine = create_async_engine(
    SQLALCHEMY_DATABASE_URI, pool_pre_ping=True, poolclass=NullPool
)

SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    class_=AsyncSession,
    bind=engine,
    expire_on_commit=False,
)

Upvotes: 1

Norbert
Norbert

Reputation: 1

generally I had similar issue when using:

@app.middleware("http")
async def add_process_time_header(request: fastapi.Request, call_next):

Disabling middleware helped. Still trying to figure this out :)

Upvotes: 0

Related Questions