jhaiduce
jhaiduce

Reputation: 438

How do I manually commit a sqlalchemy database transaction inside a pyramid web app?

I have a Pyramid web app that needs to run a Celery task after committing changes to a sqlalchemy database. I know I can do this using request.tm.get().addAfterCommitHook(). However, that doesn't work for me because I also need to use the task_id of the celery task inside the view. Therefore I need to commit changes to the database before I call task.delay() on my Celery task.

The zope.sqlalchemy documentation says that I can manually commit using transaction.commit(). However, this does not work for me; the celery task runs before the changes are committed to the database, even though I called transaction.commit() before I called task.delay()

My Pyramid view code looks like this:

ride=appstruct_to_ride(dbsession,appstruct)
dbsession.add(ride)

# Flush dbsession so ride gets an id assignment
dbsession.flush()

# Store ride id
ride_id=ride.id
log.info('Created ride {}'.format(ride_id))

# Commit ride to database
import transaction
transaction.commit()

# Queue a task to update ride's weather data
from ..processing.weather import update_ride_weather
update_weather_task=update_ride_weather.delay(ride_id)

url = self.request.route_url('rides')
return HTTPFound(
    url,
    content_type='application/json',
    charset='',
    text=json.dumps(
        {'ride_id':ride_id,
         'update_weather_task_id':update_weather_task.task_id}))

My celery task looks like this:

@celery.task(bind=True,ignore_result=False)
def update_ride_weather(self,ride_id, train_model=True):

    from ..celery import session_factory
    
    logger.debug('Received update weather task for ride {}'.format(ride_id))

    dbsession=session_factory()
    dbsession.expire_on_commit=False

    with transaction.manager:
        ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()

The celery task fails with NoResultFound:

  File "/app/cycling_data/processing/weather.py", line 478, in update_ride_weather
    ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3282, in one
    raise orm_exc.NoResultFound("No row was found for one()")

When I inspect the database after the fact, I see that the record was in fact created, after the celery task ran and failed. So this means that transaction.commit() did not commit the transaction as expected, but changes were instead committed automatically by the zope.sqlalchemy machinery after the view returned. How do I commit a transaction manually inside my view code?

Upvotes: 0

Views: 1140

Answers (1)

Michael Merickel
Michael Merickel

Reputation: 23331

request.tm is defined by pyramid_tm and could be the threadlocal transaction.manager object or a per-request object, depending on how you've configured pyramid_tm (look for pyramid_tm.manager_hook being defined somewhere to determine which one is being used.

Your question is tricky because whatever you do should fit into pyramid_tm and how it expects things to operate. Specifically it's planning to control a transaction around the lifecycle of the request - committing early is not a good idea with that transaction. pyramid_tm is trying to help to provide a failsafe ability to rollback the entire request if any failures occur anywhere in the request's lifecycle - not just in your view callable.

Option 1:

Commit early anyway. If you're gonna do this then failures after the commit cannot roll back the committed data, so you could have request's partially committed. Ok, fine, that's your question so the answer is to use request.tm.commit() probably followed by a request.tm.begin() to start a new one for any subsequent changes. You'll also need to be careful to not share sqlalchemy managed objects across that boundary, like request.user, etc as they need to be refreshed/merged into the new transaction (SQLAlchemy's identity cache cannot trust data loaded from a different transaction by default because that's just how isolation levels work).

Option 2:

Start a separate transaction just for the data you want to commit early. Ok, so assuming you're not using any threadlocals like transaction.manager, or scoped_session then you can probably start your own transaction and commit it, without touching the dbsession that is being controlled by pyramid_tm. Some generic code that works with the pyramid-cookiecutter-starter project structure could be:

from myapp.models import get_tm_session

tmp_tm = transaction.TransactionManager(explicit=True)
with tmp_tm:
    dbsession_factory = request.registry['dbsession_factory']
    tmp_dbsession = get_tm_session(dbsession_factory, tmp_tm)
    # ... do stuff with tmp_dbsession that is committed in this with-statement
    ride = appstruct_to_ride(tmp_dbsession, appstruct)
    # do not use this ride object outside of the with-statement
    tmp_dbsession.add(ride)
    tmp_dbsession.flush()
    ride_id = ride.id

# we are now committed so go ahead and start your background worker
update_weather_task = update_ride_weather.delay(ride_id)

# maybe you want the ride object outside of the tmp_dbsession
ride = dbsession.query(Ride).filter(Ride.id==ride_id).one()

return {...}

This isn't bad - probably about the best you can do as far as failure-modes go without hooking celery into the pyramid_tm-controlled dbsession.

Upvotes: 2

Related Questions