Reputation: 1775
I try to implement DI in my console app using depedency_injector
I have some rules that every request should use the same DB Connection to all dependencies, so when there is a request it will create a new DB Connection, 1 DbConnection each request so I can make this transactional (ACID).
Here is my code, is still create a new DB connection on all dependencies. I don't want to use Singleton since it only used 1 connection.
Database.py
engine_mysql = create_engine(DB_MYSQL_DSN, pool_size=20, max_overflow=0)
session_mysql = sessionmaker(autocommit=False, autoflush=False, bind=engine_mysql)
class Database:
_session_mysql: Session
_session_psql: Session
def __init__(self) -> None:
print(100 * "=")
print("SESSION CREATED")
print(100 * "=")
self._session_mysql = session_mysql()
def get_mysql_session(self):
print(100 * "=")
print("SESSION CALL MYSQL")
print(100 * "=")
return self._session_mysql
DatabaseContainer.py
class DatabaseContainer(containers.DeclarativeContainer):
database = providers.Factory(Database)
db_mysql = providers.Singleton(database.provided.get_mysql_session)
RepositoryContainer.py
class RepositoryContainer(containers.DeclarativeContainer):
database = providers.DependenciesContainer()
transaction = providers.Factory(TransactionRepository, session=database.db_mysql)
transaction_package = providers.Factory(
TransactionPackageRepository,
session=database.db_psql,
)
ServiceContainer.py
class ServiceContainer(containers.DeclarativeContainer):
repository = providers.DependenciesContainer()
database = providers.DependenciesContainer()
order = providers.Factory(
orderService,
# database
db_conn=database.db_mysql,
# repository
transaction_repo=repository.transaction,
transaction_package_repo=repository.transaction_package,
)
AppContainer.py
class AppContainer(containers.DeclarativeContainer):
database = providers.Container(DatabaseContainer)
repository = providers.Container(RepositoryContainer, database=database)
service = providers.Container(
ServiceContainer,
repository=crm_repository,
database=database
)
Client Code
class OrderWatcherJob(BaseJob):
def __init__(self):
self._logger = logging.getLogger(__name__)
self.func = self.execute_job
@inject
def execute_job(
self,
order_service: OrderService = Provide[AppContainer.service.order],
db_mysql:Session = Provide[AppContainer.database.db_mysql]
):
try:
order_service.execute_something()
db_mysql.commit()
#or order_service.db_conn.commit()
except Exception as ex:
self._logger.error(ex)
db_mysql.rollback()
#or order_service.db_conn.rollback()
finally:
db_mysql.close()
#or order_service.db_conn.close()
So.. how do I make this become used only 1 DB Connection every request ? I mean when I inject my OrderService it will use the same DB Connection on TransactionRepo, TransactionPackageRepo, and also db_conn.
Upvotes: 5
Views: 1260
Reputation: 149
To use only one session per request (using FastAPI, which can be scoped by the asyncio loop), the library provides the ContextLocalSingleton provider. It internally manages the scope using the ContextVar and generates a new instance for each "loop".
I have a GitHub repository template for new projects using FastAPI and the dependency_injection library. In this module I set up the engine and session, and I believe it can help you achieve your objective.
BTW, I recommend in your example taking advantage of DI and managing the engine
and sessionmaker
using providers (a Singleton for the engine and the result of the sessionmaker
), then using it as a builder with the ContextLocalSingleton)
As an example, here is a simple refactor:
from dependency_injector import providers
from dependency_injector.containers import DeclarativeContainer
from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker
class DatabaseContainer(containers.DeclarativeContainer):
engine = providers.Singleton(create_engine, url='...', pool_size=20, max_overflow=0)
session_maker = providers.Singleton(sessionmaker, bind=engine)
# Creates a new session in each FastAPI request
session = providers.ContextLocalSingleton(session_maker.provided.call())
Upvotes: 0