How to avoid unnecesary Idle in transaction in FastAPI - SQLAlchemy application with repository pattern?

This question is inspired by the articles https://www.gorgias.com/blog/prevent-idle-in-transaction-engineering and https://capnfabs.net/posts/sqlalchemy-connection-management/.

SQLALchemy session lifecycle management in the context of the FastAPI framework is driving me crazy. At the moment we are writing repositories as they do in various tutorials on the Internet, as well as services, views. It looks like this.

Base repositories:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker


engine = create_async_engine(settings.DB.async_dns, **settings.DB.OPTIONS)
Session = async_sessionmaker(bind=engine, expire_on_commit=False)


# dependency that manages commits
async def get_session():
    session = Session()
    try:
        yield session
        await session.commit()
    except Exception as e:
        await session.rollback()
        raise e
    finally:
        await session.close()


class BaseRepository:
    def __init__(self, session: AsyncSession = Depends(get_session)):
        self._session = session

    async def count(self, stmt: Select) -> int:
        stmt = stmt.with_only_columns(func.count(literal_column("1")), maintain_column_froms=True)
        return await self._session.scalar(stmt)

    async def update_instance(self, instance, update_data: dict):
        for field, value in update_data.items():
            setattr(instance, field, value)
        await self._session.flush()
        return instance

    async def scalar_or_raise(self, stmt, error: str = None):
        if result := await self._session.scalar(stmt):
            return result
        if error:
            raise NotFoundError(error)
        raise NotFoundError

    # another methods


class ModelRepository(BaseRepository):
    model = None

    def __init__(self, session: AsyncSession = Depends(get_session)):
        if not self.model:
            raise ValueError()
        super().__init__(session)

    async def get_by_pk(self, pk):
        if instance := await self._session.get(self.model, pk):
            return instance
        raise NotFoundError

    async def create(self, **kwargs):
        # commits are performed by the calling code
        instance = self.model(**kwargs)
        self._session.add(instance)
        return instance

    async def bulk_create(self, values: list[dict]):
        # commits are performed by the calling code
        instances = [self.model(**item) for item in values]
        self._session.add_all(instances)
        return instances

    # another methods

Let there be SQLAlchemy models:

class UserType(Base):
    # catalog model
    __tablename__ = "user_types"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    code: Mapped[str] = mapped_column(unique=True)  # f.e. student, teacher, cleaner
    name: Mapped[str]  # f.e. Student, Teacher, Cleaner


class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    first_name: Mapped[str]
    last_name: Mapped[str]
    email: Mapped[str] = mapped_column(unique=True)
    type_id: Mapped[int] = mapped_column(ForeignKey("user_types.id", ondelete="cascade"))

Repositories for models:

from sqlalchemy import select


class UsersRepository(ModelRepository):
    model = User


class UserTypesRepository(ModelRepository):
    model = UserType

    # for catalog models I like doing methods like this:

    async def get_STUDENT_type(self):
        return await self.get_by_code("student")

    async def get_TEACHER_type(self):
        return await self.get_by_code("teacher")

    async def get_CLEANER_type(self):
        return await self.get_by_code("cleaner")

    async def get_by_code(self, code):
        stmt = select(self.model).where(self.model.code == code)
        return await self.scalar_or_raise(stmt, "User type not found")

For each endpoint, a service and repositories are created:


# create_user.py

from sqlalchemy import update


class InputSchema(BasicModel):
    first_name: str
    last_name: str
    email: str
    type_id: int


class Repository(BaseRepository):

    async def email_registered(self, email):
        # this method is being used only in this endpoint
        # I don't want to put in UsersRepository
        # sorry I couldn't figure out more realistic example
        stmt = select("1").where(User.email == email)
        return await self._session.scalar(stmt)

    async def some_method_that_modifies_something(self):
        stmt = update(...)
        await self._session.execute(stmt)
        # sorry I couldn't figure out example



class Service:

    def __init__(
        self,
        repository: Repository = Depends(Repository),
        users_repository: UsersRepository = Depends(UsersRepository),
        user_types_repository: UserTypesRepository = Depends(UserTypesRepository),
    ):
        # all repositories share the same SQLAlchemy session
        self.repository = repository
        self.users_repository = users_repository
        self.user_types_repository = user_types_repository

    async def create_user(self, input: InputSchema):
        # first check if email already registered
        # this check is being done first because sql request will be faster then http request further
        # transaction is being opened
        if self.repository.email_registered(input.email):
            raise HTTPException(status_code=400, detail="User already registered")
        # idle in transaction
        await self.repository.some_method_that_modifies_something()
        # idle in transaction
        # here we cannot commit to stop transaction for optimization purposes before "long" http request
        # because I need to commit it all or nothing
        # http request lasts about 0.5 second
        if not self.user_exists_in_keycloak(input.email):
            raise HTTPException(status_code=400, detail="User does not exist in keycloak")
        type = await self.user_types_repository.get_by_pk(input.type_id)
        # idle in transaction
        if not type:
            raise HTTPException(status_code=400, detail="User type not found")
        # idle in transaction
        user = self.users_repository.create(first_name=input.first_name, last_name=input.last_name, type=type)
        # idle in transaction
        return user


    async def user_exists_in_keycloak(self, email) -> bool:
        # http request to keycloak to represent "long" pause when connection and transaction are idle uselessly


@app.post("/user")  # app is app = FastAPI()
async def create_user(input: InputSchema, service: Service = Depends(Service)):
    return await service.create_user(input)

The above example of the service-repository connection works, but there are questions about the use of DB resources (transactions, connections).

You can see in the Service.create_user method the transaction is opened on the first request and closed in dependency get_session(). During an http request to keycloak the transaction is idle for a long time. This takes up resources. In addition to the transaction, the connection is uselessly idle. All this wastes DB resources, as far as I can tell. I would like to get rid of these idle times.

QUESTION: how to get rid of these idle times?

I see optimization this way. After each select request a commit must be performed, and the changes are recorded once at the end in dependency get_session(). But it will probably be strange to manually do session.commit() after calling each select method?

And probably two sessions will be required here: one with AUTOCOMMIT for select queries, the second - in which changes will be accumulated and which will be committed at the end?

Upvotes: 2

Views: 91

Answers (1)

Aryan Raj
Aryan Raj

Reputation: 292

You're running into the classic "idle in transaction" problem with SQLAlchemy and FastAPI. Here's a more optimized approach:

# Create two session factories
ReadSession = async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=True)
WriteSession = async_sessionmaker(bind=engine, expire_on_commit=False)

# Context manager for explicit transaction control
@asynccontextmanager
async def transaction(session):
    try:
        yield session
        await session.commit()
    except Exception as e:
        await session.rollback()
        raise e

Then in your service:

class UserService:
    def __init__(self, read_session=Depends(get_read_session), write_session=Depends(get_write_session)):
        self.read_session = read_session
        self.write_session = write_session
        self.users_repo_read = UsersRepository(read_session)
        self.users_repo_write = UsersRepository(write_session)
    
    async def create_user(self, input_data):
        # Read operations with autocommit
        if await self.users_repo_read.email_registered(input_data["email"]):
            raise HTTPException(status_code=400, detail="User already registered")
        
        # External HTTP call - no transaction open
        keycloak_user_exists = await self.user_exists_in_keycloak(input_data["email"])
        
        # Write operation with explicit transaction
        async with transaction(self.write_session):
            user = await self.users_repo_write.create(...)
            return user

This separates read/write operations and ensures transactions are only open when needed. No more idle connections during HTTP calls!

Upvotes: 1

Related Questions