Reputation: 41
TL;DR
In my application there are 2 parts. First, part is API which only serves data (has multiple GET endpoints). Second, the event consumer consumes from pubsub and writes to db. I am using sync DB for event consumer and async db for API. My code works perfectly fine but tests are failing. In my tests, I insert data using a sync session, then try to get with an async session. However, I get no data returned when I call API using async client (which uses async db). Can you help me to find out the problem?
Edit: I think I need to start async engine from savepoint that sync engine lastly created after inserting all data. Is there a way to pass savepoint to another fixture?
I read at multiple resources and thought that if I create client,db for sync and async, use recipe for Joining a Session into an External Transaction and add my data using sync db session and use async client with async db to query data I will have proper result. But, it seems like I am missing something. Because after I insert I add pdb.set_trace() and check db, data seems to be inserted. However, after switching to async client db seems empty. Since my query results are empty. You can find the code I used below. I think problem is related to my configuration in conftest, if you need more code or any extra information I will share it right away!
conftest.py
import sqlalchemy as sa
from fastapi.testclient import TestClient
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from httpx import AsyncClient
import pytest
@pytest.fixture(scope="session")
def engine(config: TestConfig):
return sa.create_engine(config.DATABASE_URL)
@pytest.fixture(scope="session")
def async_engine(config: TestConfig):
return create_async_engine(
config.ASYNC_DATABASE_URL, pool_size=10, echo=True, max_overflow=10
)
@pytest.fixture(scope="session")
def TestingSessionLocal(engine):
return sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
)
@pytest.fixture(scope="session")
def TestingSessionLocalAsync(async_engine):
return sessionmaker(
bind=async_engine,
autoflush=False,
future=True,
class_=AsyncSession,
expire_on_commit=False,
)
@pytest.fixture
def db(engine, TestingSessionLocal):
connection = engine.connect()
transaction = connection.begin()
session = TestingSessionLocal(bind=connection)
session.begin_nested()
@sa.event.listens_for(session, "after_transaction_end")
def end_savepoint(session, transaction):
if transaction.nested and not transaction._parent.nested:
session.begin_nested()
yield session
# Rollback the overall transaction, restoring the state before the test ran.
session.close()
transaction.rollback()
connection.close()
@pytest.fixture
async def async_db(async_engine, TestingSessionLocalAsync):
async with async_engine.connect() as conn:
await conn.begin()
await conn.begin_nested()
async_session = TestingSessionLocalAsync()
@sa.event.listens_for(async_session.sync_session, "after_transaction_end")
def end_savepoint(session, transaction):
if conn.closed:
return
if not conn.in_nested_transaction():
conn.sync_connection.begin_nested()
yield async_session
await async_session.close()
await conn.close()
@pytest.fixture
def app(config):
return create_app(config)
@pytest.fixture
def client(app, db):
def override_get_db():
yield db
app.dependency_overrides[get_db] = override_get_db
yield TestClient(app)
del app.dependency_overrides[get_db]
@pytest.fixture()
async def async_client(app, async_db):
def override_get_async_db():
yield async_db
app.dependency_overrides[get_async_db] = override_get_async_db
async with AsyncClient(
app=app,
base_url="http://localhost:8080",
headers={"Content-Type": "application/json"},
) as client:
yield client
del app.dependency_overrides[get_async_db]
test.py
@pytest.mark.asyncio
async def test_thing(async_client, db: Session):
thing = Thing()
db.add(thing)
db.flush()
db.refresh(thing)
# db.commit() # Tried adding db.commit() here result has not changed
res1 = await async_db.execute(select(thing.name).select_from(thing))
res2 = db.execute(select(thing.name).select_from(thing))
pdb.set_trace()
get_thing = await async_client.get(
f"/thing/{thing.id}",
)
assert len(get_thing) == 1
pdb result
It basically fails in assert line in test.py outputing len(get_thing) is 0.
Upvotes: 4
Views: 2483
Reputation: 3883
You definitely have to call db.commit()
after adding the thing
to the database, otherwise any other connection (either synchronous or asynchronous) will not be able to retrieve new data as it will never be saved to the database as db.flush()
doesn't save data, but only transfers it from ORM to database transaction buffer. At the same time in your concrete example you're still able to see newly added data in the synchronous query, jut because you're reading from the current transaction buffer and hence have access to uncommitted data. This behavior is easy to confirm by querying the database after executing the test using any other tool, e.g. psql
to ensure that there is no new data exist in the table.
The second problem that even after adding the db.commit()
statement you're not able to retrieve newly added rows from the table. I assume this happens because you explicitly start a transaction in the fixture with conn.begin()
and conn.begin_nested()
, so that it starts before data gets committed and since default isolation level in postgres is "Read Commited" you're not able to see any data that was committed after transaction starts.
To sum up:
flush
and refresh
, replace them with commit
in the testbegin
and begin_nested
let SQLAlchemy silently do its magicUpvotes: 6