Rom D
Rom D

Reputation: 3

Use a psycopg3 async Pool with SQLAlchemy, SQLModel and Fastapi

I would like to avail the last improvements of async with psycopg3, SQLAlchemy, SQLModel and FastAPI. I found some references about such setup but never with a pool of connection to postgres.

Something is definetely missing as I have an error :

error connecting in 'pool-1': missing "=" after "postgresql+psycopg://postgres:[email protected]:5432/foo" in connection info string

# Database

#CREATE TABLE IF NOT EXISTS hero (
#    id INT PRIMARY KEY,
#    name VARCHAR(255) NOT NULL,
#    country VARCHAR(50) NOT NULL
#);

import asyncio
from fastapi import FastAPI, APIRouter
from functools import lru_cache
from psycopg_pool import AsyncConnectionPool

DATABASE_URL = "postgresql+psycopg://postgres:[email protected]:5432/foo"

class AsyncPoolProxy:
    def __init__(self, pool):
        self.pool = pool

    async def acquire(self):
        return await self.pool.acquire()

    async def release(self, conn):
        await self.pool.release(conn)

    def dispose(self):
        pass


async def check_async_connections(async_pool):
    while True:
        await asyncio.sleep(600)
        print("check connections")
        await async_pool.check()

@lru_cache()
def get_async_pool():
    return AsyncConnectionPool(conninfo=DATABASE_URL)

app = FastAPI(
    title="Test FASTAPI SQLALCHEMY SQLMODEL",
    description="TBD",
    version="0.1.0",
    contact={
        "name": "RD",
    },
)

async_pool = get_async_pool()


@app.on_event("startup")
def startup():
    asyncio.create_task(check_async_connections(async_pool))


engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # Pool handled directly
    connect_args={"server_settings": {"jit": "off"}},  # Deactivate JIT for better performances
)

async def get_session() -> AsyncSession:
    pool = get_async_pool()
    async_session = sessionmaker(
        bind=engine,
        class_=AsyncSession,
        expire_on_commit=False,
        pool=AsyncPoolProxy(pool)
    )
    async with async_session() as session:
        yield session


class Hero(SQLModel, table=True):
    __tablename__ = 'Hero'

    id: int = Field(primary_key=True)
    name: str
    country: str | None = None


@app.post("/heroes/")
def create_hero(hero: Hero, session: Session = Depends(get_session)):
    session.add(hero)
    session.commit()
    session.refresh(hero)
    return hero

@app.get("/heroes/{hero_id}")
def read_hero(hero_id: int, session: Session = Depends(get_session)):
    hero = session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    return hero

@app.get("/heroes/")
def read_heroes(session: Session = Depends(get_session)):
    heroes = session.exec(select(Hero)).all()
    return heroes
# Requirements
alembic==1.13.2
fastapi==0.112.1
psycopg==3.2.1
psycopg-binary==3.2.1
psycopg-pool==3.2.2
pydantic==2.8.2
pydantic_core==2.20.1
SQLAlchemy==2.0.32
sqlmodel==0.0.21
uvicorn==0.30.6

Did you have the chance to build such setup ? Do you know why my connection string is incorrect ?

Upvotes: 0

Views: 721

Answers (1)

Rom D
Rom D

Reputation: 3

As @Adrian mentionned in the comment, the connection string for psycopg should not include the driver while for sqlalchemy, it has too if you want to avoid psycopg2.

Upvotes: 0

Related Questions