Faris Dewantoro
Faris Dewantoro

Reputation: 1775

Python Depedency Injector passing single SQLAlchemy db connection every request

I try to implement DI in my console app using depedency_injector

I have some rules that every request should use the same DB Connection to all dependencies, so when there is a request it will create a new DB Connection, 1 DbConnection each request so I can make this transactional (ACID).

Here is my code, is still create a new DB connection on all dependencies. I don't want to use Singleton since it only used 1 connection.

Database.py

engine_mysql = create_engine(DB_MYSQL_DSN, pool_size=20, max_overflow=0)
session_mysql = sessionmaker(autocommit=False, autoflush=False, bind=engine_mysql)

class Database:
    _session_mysql: Session
    _session_psql: Session

    def __init__(self) -> None:
        print(100 * "=")
        print("SESSION CREATED")
        print(100 * "=")
        self._session_mysql = session_mysql()

    def get_mysql_session(self):
        print(100 * "=")
        print("SESSION CALL MYSQL")
        print(100 * "=")
        return self._session_mysql

DatabaseContainer.py

class DatabaseContainer(containers.DeclarativeContainer):

    database = providers.Factory(Database)
    db_mysql = providers.Singleton(database.provided.get_mysql_session)

RepositoryContainer.py

class RepositoryContainer(containers.DeclarativeContainer):
    database = providers.DependenciesContainer()
    transaction = providers.Factory(TransactionRepository, session=database.db_mysql)
    transaction_package = providers.Factory(
      TransactionPackageRepository,
      session=database.db_psql,
    )

ServiceContainer.py

class ServiceContainer(containers.DeclarativeContainer):
    repository = providers.DependenciesContainer()
    database = providers.DependenciesContainer()

    order = providers.Factory(
        orderService,
        # database
        db_conn=database.db_mysql,
        # repository
        transaction_repo=repository.transaction,
        transaction_package_repo=repository.transaction_package,
    )

AppContainer.py

class AppContainer(containers.DeclarativeContainer):

    database = providers.Container(DatabaseContainer)

    repository = providers.Container(RepositoryContainer, database=database)
    
    service = providers.Container(
        ServiceContainer,
        repository=crm_repository,
        database=database
    )

Client Code

class OrderWatcherJob(BaseJob):
    def __init__(self):
        self._logger = logging.getLogger(__name__)
        self.func = self.execute_job

    @inject
    def execute_job(
        self,
        order_service: OrderService = Provide[AppContainer.service.order],
        db_mysql:Session = Provide[AppContainer.database.db_mysql]
    ):

        try:
            order_service.execute_something()
            db_mysql.commit()
            #or order_service.db_conn.commit() 
        except Exception as ex:
            self._logger.error(ex)
            db_mysql.rollback()
            #or order_service.db_conn.rollback() 
        finally:
            db_mysql.close()
           #or order_service.db_conn.close() 

So.. how do I make this become used only 1 DB Connection every request ? I mean when I inject my OrderService it will use the same DB Connection on TransactionRepo, TransactionPackageRepo, and also db_conn.

Upvotes: 5

Views: 1260

Answers (1)

Aran Moncusi Ramirez
Aran Moncusi Ramirez

Reputation: 149

To use only one session per request (using FastAPI, which can be scoped by the asyncio loop), the library provides the ContextLocalSingleton provider. It internally manages the scope using the ContextVar and generates a new instance for each "loop".

I have a GitHub repository template for new projects using FastAPI and the dependency_injection library. In this module I set up the engine and session, and I believe it can help you achieve your objective.

BTW, I recommend in your example taking advantage of DI and managing the engine and sessionmaker using providers (a Singleton for the engine and the result of the sessionmaker), then using it as a builder with the ContextLocalSingleton)

As an example, here is a simple refactor:

from dependency_injector import providers
from dependency_injector.containers import DeclarativeContainer

from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker


class DatabaseContainer(containers.DeclarativeContainer):

    engine = providers.Singleton(create_engine, url='...', pool_size=20, max_overflow=0)
    
    session_maker = providers.Singleton(sessionmaker, bind=engine)

    #  Creates a new session in each FastAPI request
    session = providers.ContextLocalSingleton(session_maker.provided.call())


Upvotes: 0

Related Questions