Reputation: 857
This question is inspired by the articles https://www.gorgias.com/blog/prevent-idle-in-transaction-engineering and https://capnfabs.net/posts/sqlalchemy-connection-management/.
SQLALchemy session lifecycle management in the context of the FastAPI framework is driving me crazy. At the moment we are writing repositories as they do in various tutorials on the Internet, as well as services, views. It looks like this.
Base repositories:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine(settings.DB.async_dns, **settings.DB.OPTIONS)
Session = async_sessionmaker(bind=engine, expire_on_commit=False)
# dependency that manages commits
async def get_session():
session = Session()
try:
yield session
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
class BaseRepository:
def __init__(self, session: AsyncSession = Depends(get_session)):
self._session = session
async def count(self, stmt: Select) -> int:
stmt = stmt.with_only_columns(func.count(literal_column("1")), maintain_column_froms=True)
return await self._session.scalar(stmt)
async def update_instance(self, instance, update_data: dict):
for field, value in update_data.items():
setattr(instance, field, value)
await self._session.flush()
return instance
async def scalar_or_raise(self, stmt, error: str = None):
if result := await self._session.scalar(stmt):
return result
if error:
raise NotFoundError(error)
raise NotFoundError
# another methods
class ModelRepository(BaseRepository):
model = None
def __init__(self, session: AsyncSession = Depends(get_session)):
if not self.model:
raise ValueError()
super().__init__(session)
async def get_by_pk(self, pk):
if instance := await self._session.get(self.model, pk):
return instance
raise NotFoundError
async def create(self, **kwargs):
# commits are performed by the calling code
instance = self.model(**kwargs)
self._session.add(instance)
return instance
async def bulk_create(self, values: list[dict]):
# commits are performed by the calling code
instances = [self.model(**item) for item in values]
self._session.add_all(instances)
return instances
# another methods
Let there be SQLAlchemy models:
class UserType(Base):
# catalog model
__tablename__ = "user_types"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
code: Mapped[str] = mapped_column(unique=True) # f.e. student, teacher, cleaner
name: Mapped[str] # f.e. Student, Teacher, Cleaner
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
first_name: Mapped[str]
last_name: Mapped[str]
email: Mapped[str] = mapped_column(unique=True)
type_id: Mapped[int] = mapped_column(ForeignKey("user_types.id", ondelete="cascade"))
Repositories for models:
from sqlalchemy import select
class UsersRepository(ModelRepository):
model = User
class UserTypesRepository(ModelRepository):
model = UserType
# for catalog models I like doing methods like this:
async def get_STUDENT_type(self):
return await self.get_by_code("student")
async def get_TEACHER_type(self):
return await self.get_by_code("teacher")
async def get_CLEANER_type(self):
return await self.get_by_code("cleaner")
async def get_by_code(self, code):
stmt = select(self.model).where(self.model.code == code)
return await self.scalar_or_raise(stmt, "User type not found")
For each endpoint, a service and repositories are created:
# create_user.py
from sqlalchemy import update
class InputSchema(BasicModel):
first_name: str
last_name: str
email: str
type_id: int
class Repository(BaseRepository):
async def email_registered(self, email):
# this method is being used only in this endpoint
# I don't want to put in UsersRepository
# sorry I couldn't figure out more realistic example
stmt = select("1").where(User.email == email)
return await self._session.scalar(stmt)
async def some_method_that_modifies_something(self):
stmt = update(...)
await self._session.execute(stmt)
# sorry I couldn't figure out example
class Service:
def __init__(
self,
repository: Repository = Depends(Repository),
users_repository: UsersRepository = Depends(UsersRepository),
user_types_repository: UserTypesRepository = Depends(UserTypesRepository),
):
# all repositories share the same SQLAlchemy session
self.repository = repository
self.users_repository = users_repository
self.user_types_repository = user_types_repository
async def create_user(self, input: InputSchema):
# first check if email already registered
# this check is being done first because sql request will be faster then http request further
# transaction is being opened
if self.repository.email_registered(input.email):
raise HTTPException(status_code=400, detail="User already registered")
# idle in transaction
await self.repository.some_method_that_modifies_something()
# idle in transaction
# here we cannot commit to stop transaction for optimization purposes before "long" http request
# because I need to commit it all or nothing
# http request lasts about 0.5 second
if not self.user_exists_in_keycloak(input.email):
raise HTTPException(status_code=400, detail="User does not exist in keycloak")
type = await self.user_types_repository.get_by_pk(input.type_id)
# idle in transaction
if not type:
raise HTTPException(status_code=400, detail="User type not found")
# idle in transaction
user = self.users_repository.create(first_name=input.first_name, last_name=input.last_name, type=type)
# idle in transaction
return user
async def user_exists_in_keycloak(self, email) -> bool:
# http request to keycloak to represent "long" pause when connection and transaction are idle uselessly
@app.post("/user") # app is app = FastAPI()
async def create_user(input: InputSchema, service: Service = Depends(Service)):
return await service.create_user(input)
The above example of the service-repository connection works, but there are questions about the use of DB resources (transactions, connections).
You can see in the Service.create_user method the transaction is opened on the first request and closed in dependency get_session()
.
During an http request to keycloak the transaction is idle for a long time. This takes up resources. In addition to the transaction, the connection
is uselessly idle. All this wastes DB resources, as far as I can tell. I would like to get rid of these idle times.
QUESTION: how to get rid of these idle times?
I see optimization this way. After each select request a commit must be performed, and the changes are recorded once at the end in dependency get_session()
.
But it will probably be strange to manually do session.commit()
after calling each select method?
And probably two sessions will be required here: one with AUTOCOMMIT
for select queries, the second - in which changes will be accumulated and which will be committed at the end?
Upvotes: 2
Views: 91
Reputation: 292
You're running into the classic "idle in transaction" problem with SQLAlchemy and FastAPI. Here's a more optimized approach:
# Create two session factories
ReadSession = async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=True)
WriteSession = async_sessionmaker(bind=engine, expire_on_commit=False)
# Context manager for explicit transaction control
@asynccontextmanager
async def transaction(session):
try:
yield session
await session.commit()
except Exception as e:
await session.rollback()
raise e
Then in your service:
class UserService:
def __init__(self, read_session=Depends(get_read_session), write_session=Depends(get_write_session)):
self.read_session = read_session
self.write_session = write_session
self.users_repo_read = UsersRepository(read_session)
self.users_repo_write = UsersRepository(write_session)
async def create_user(self, input_data):
# Read operations with autocommit
if await self.users_repo_read.email_registered(input_data["email"]):
raise HTTPException(status_code=400, detail="User already registered")
# External HTTP call - no transaction open
keycloak_user_exists = await self.user_exists_in_keycloak(input_data["email"])
# Write operation with explicit transaction
async with transaction(self.write_session):
user = await self.users_repo_write.create(...)
return user
This separates read/write operations and ensures transactions are only open when needed. No more idle connections during HTTP calls!
Upvotes: 1