João
João

Reputation: 23

Race condition using FastAPI, SQLAlchemy and Asyncpg

I am currently having problems solving an issue with writing data in the postgres db due to a race condition.

Basically, I need to insert into the database a "Medico" if it doesn't exists.

However, when doing load tests, I noticed that threads are trying to insert the same values at the same time, throwing errors.

In this method, it first should look for a Medico. If it doesn't find, it creates it. The issue lies when two or more threads try to access it at the same time, and then try to save it multiple times to the database.

async def busca_cria_medico(self, venda_medico: dict) -> MedicoModel:
        """
        Verificando a existência do médico. Caso não exista, cria.
        """
        medico_existente = await self.db.execute(
            select(MedicoModel)
            .filter(MedicoModel.numero_registro == venda_medico.numero_registro,
                    MedicoModel.sigla_conselho == venda_medico.sigla_conselho, 
                    MedicoModel.uf == venda_medico.uf)
            .with_for_update()
        )
        
        medico_existente = medico_existente.scalars().first()

        if medico_existente is None:
            novo_medico = MedicoModel(nome=venda_medico.nome, numero_registro=venda_medico.numero_registro, uf=venda_medico.uf, sigla_conselho=venda_medico.sigla_conselho)
            self.db.add(novo_medico)
            await self.db.flush()
        
            return novo_medico

        return medico_existente

This is how I'm calling the function

async with self.db.begin_nested():
    medico = await self.busca_cria_medico(venda.medico)

Here is my connection

engine = create_async_engine(URL, pool_size=40, pool_timeout=30)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)

How I get my db session

async def get_db():
    async with SessionLocal() as db:
        try:
            yield db
        finally:
            await db.close()

And this is the route handler

@router.post('/realizar-venda', status_code=status.HTTP_200_OK)
async def realiza_venda(venda: RealizarVenda, request: Request, db: AsyncSession = Depends(get_db)):
    token = request.headers.get('Authorization').split(' ')[1]
    venda_service = VendaService(db=db)
    await venda_service.realiza_venda(venda=venda, token=token)

Upvotes: 1

Views: 197

Answers (0)

Related Questions