ljiatu
ljiatu

Reputation: 571

SQLAlchemy with_for_update reads stale data

I'm writing a function that's responsible for updating account balance. In order to prevent concurrent updates, I'm first using with_for_update() to grab a lock on the accounts, calculate the amount, update the balances, and then commit the session. In order to simulate concurrent requests, I spawn off two processes and run the function once in each. Here's the code for calculating the and updating the balances:

session = create_db_session(db_engine)()
session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

print("&" * 80)
print(f"{process_number} entering!")
print("&" * 80)

accounts = (
    session.query(Account)
    .filter(Account.id == [some account IDs])
    .with_for_update()
    .populate_existing()
    .all()
)

print("*" * 80)
print(f"{process_number} got here!")
for account in accounts:
    print(
        f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
    )
    print(hex(id(session)))
    print("*" * 80)

# Calculate the total amount outstanding by account.
for account in accounts:
    total_amount = _calculate_total_amount()
    if account.balance >= total_amount:
        # For accounts with sufficient balance, deduct the amount from the balance.
        account.balance -= total_amount
    else:
        # Otherwise, save them for notification. Code omitted.

print("-" * 80)
print(f"{process_number} committing!")
for li, account in line_items_accounts:
    print(
        f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
    )
    print("-" * 80)
session.commit()

Here's the output:

&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
0 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
1 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
********************************************************************************
0 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65d7e0d0
********************************************************************************
--------------------------------------------------------------------------------
0 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------
********************************************************************************
1 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65f930a0
********************************************************************************
--------------------------------------------------------------------------------
1 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------

0 and 1 are process numbers, and the hexadecimal number is the id of the session. You can see that the lock worked (process 0 blocked 1 until 0 committed), but 1 read stale data: the balance should have been 19930.01, not 20000, and in the output for process 1, the "Account version" should have been 2, not 1.

I've tried using populate_existing() with no luck, although I suspect it was not going to be helpful anyway since the two sessions are distinct, and the session for process 1 shouldn't have populated anything until the lock is released by process 0. I've also tried "repeatable read" and "serializable" isolation levels, and was expecting an exception to be thrown in process 1 due to concurrent updates/read/write dependency between transactions, but nothing happend.

It's also interesting to note that the behavior is not consistent. Things work correctly when I run the block of code above locally, but almost never work when I build a Docker container with all the code and run it there. There is no difference in the package versions. I'm using Postgres and psycopg2.

I'm banging my head against the wall now trying to figure out what is happening. I feel like maybe I overlooked something simple. Any ideas?

Upvotes: 4

Views: 2643

Answers (1)

Erwin Brandstetter
Erwin Brandstetter

Reputation: 656501

FOR UPDATE would do the trick. The manual:

FOR UPDATE causes the rows retrieved by the SELECT statement to be locked as though for update. This prevents them from being locked, modified or deleted by other transactions until the current transaction ends. That is, other transactions that attempt UPDATE, DELETE, SELECT FOR UPDATE, SELECT FOR NO KEY UPDATE, SELECT FOR SHARE or SELECT FOR KEY SHARE of these rows will be blocked until the current transaction ends;

Bold emphasis mine.

And that's exactly what SQLAlchemy's with_for_update() does. The manual:

When called with no arguments, the resulting SELECT statement will have a FOR UPDATE clause appended.

However, this is redundant effort while operating with SERIALIZABLE snapshot isolation like you do. The manual:

This level emulates serial transaction execution for all committed transactions; as if transactions had been executed one after another, serially, rather than concurrently.

So your code is safe against race conditions, redundantly. Either use FOR UPDATE (recommended!), or use SERIALIZABLE transactions. The latter is typically substantially more expensive. And you need to prepare for serialization failures (not in your displayed code). The manual:

... like the Repeatable Read level, applications using this level must be prepared to retry transactions due to serialization failures.

The elephant in the room: Did you actually write to the DB? session.commit() may have failed after "mission accomplished" has been printed prematurely.

Check the DB log for serialization failures or any other exception. If you (unsurprisingly) find serialization failures, the simple solution is to switch to (default!) READ COMMITED isolation level. Your manual locking already does the job.

Upvotes: 3

Related Questions