Matt Hamilton
Matt Hamilton

Reputation: 815

SQLAlchemy bulk update strategies

I am currently writing a web app (Flask) using SQLAlchemy (on GAE, connecting to Google's cloud MySQL) and needing to do bulk updates of a table. In short, a number of calculations are done resulting in a single value needing to be updated on 1000's of objects. At the moment I'm doing it all in a transaction, but still at the end, the flush/commit is taking ages.

The table has an index on id and this is all carried out in a single transaction. So I believe I've avoided the usual mistakes, but is is still very slow.

INFO     2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656}

From my understanding there is no way to do a bulk update in SQL actually, and the statement above ends up being multiple UPDATE statements being sent to the server.

I've tried using Session.bulk_update_mappings() but that doesn't seem to actually do anything :( Not sure why, but the updates never actually happen. I can't see any examples of this method actually being used (including in the performance suite) so not sure if it is intended to be used.

One technique I've seen discussed is doing a bulk insert into another table and then doing an UPDATE JOIN. I've given it a test, like below, and it seems to be significantly faster.

wallets = db_session.query(Wallet).all()
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ]
db_session.bulk_save_objects(ledgers)
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
db_session.execute('TRUNCATE ledger')

But the problem now is how to structure my code. I'm using the ORM and I need to somehow not 'dirty' the original Wallet objects so that they don't get committed in the old way. I could just create these Ledger objects instead and keep a list of them about and then manually insert them at the end of my bulk operation. But that almost smells like I'm replicating some of the work of the ORM mechanism.

Is there a smarter way to do this? So far my brain is going down something like:

class Wallet(Base):
    ...
    _balance = Column(Float)
    ...

@property
def balance(self):
    # first check if we have a ledger of the same id
    # and return the amount in that, otherwise...
    return self._balance

@balance.setter
def balance(self, amount):
    l = Ledger(id=self.id, amount=amount)
    # add l to a list somewhere then process later

# At the end of the transaction, do a bulk insert of Ledgers
# and then do an UPDATE JOIN and TRUNCATE

As I said, this all seems to be fighting against the tools I (may) have. Is there a better way to be handling this? Can I tap into the ORM mechanism to be doing this? Or is there an even better way to do the bulk updates?

EDIT: Or is there maybe something clever with events and sessions? Maybe before_flush?

EDIT 2: So I have tried to tap into the event machinery and now have this:

@event.listens_for(SignallingSession, 'before_flush')
def before_flush(session, flush_context, instances):
    ledgers = []

    if session.dirty:
        for elem in session.dirty:
            if ( session.is_modified(elem, include_collections=False) ):
                if isinstance(elem, Wallet):
                    session.expunge(elem)
                    ledgers.append(Ledger(id=elem.id, amount=elem.balance))

    if ledgers:
        session.bulk_save_objects(ledgers)
        session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
        session.execute('TRUNCATE ledger')

Which seems pretty hacky and evil to me, but appears to work OK. Any pitfalls, or better approaches?

-Matt

Upvotes: 4

Views: 15974

Answers (2)

Rick James
Rick James

Reputation: 142298

Generally it is poor schema design to need to update thousands of rows frequently. That aside...

Plan A: Write ORM code that generates

START TRANSACTION;
UPDATE wallet SET balance = ... WHERE id = ...;
UPDATE wallet SET balance = ... WHERE id = ...;
UPDATE wallet SET balance = ... WHERE id = ...;
...
COMMIT;

Plan B: Write ORM code that generates

CREATE TEMPORARY TABLE ToDo (
    id ...,
    new_balance ...
);
INSERT INTO ToDo -- either one row at a time, or a bulk insert
UPDATE wallet
    JOIN ToDo USING(id)
    SET wallet.balance = ToDo.new_balance;  -- bulk update

(Check the syntax; test; etc.)

Upvotes: 1

univerio
univerio

Reputation: 20518

What you're essentially doing is bypassing the ORM in order to optimize the performance. Therefore, don't be surprised that you're "replicating the work the ORM is doing" because that's exactly what you need to do.

Unless you have a lot of places where you need to do bulk updates like this, I would recommend against the magical event approach; simply writing the explicit queries is much more straightforward.

What I recommend doing is using SQLAlchemy Core instead of the ORM to do the update:

ledger = Table("ledger", db.metadata,
    Column("wallet_id", Integer, primary_key=True),
    Column("new_balance", Float),
    prefixes=["TEMPORARY"],
)


wallets = db_session.query(Wallet).all()

# figure out new balances
balance_map = {}
for w in wallets:
    balance_map[w.id] = calculate_new_balance(w)

# create temp table with balances we need to update
ledger.create(bind=db.session.get_bind())

# insert update data
db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v}
                                           for k, v in balance_map.items()])

# perform update
db.session.execute(Wallet.__table__
                         .update()
                         .values(balance=ledger.c.new_balance)
                         .where(Wallet.__table__.c.id == ledger.c.wallet_id))

# drop temp table
ledger.drop(bind=db.session.get_bind())

# commit changes
db.session.commit()

Upvotes: 5

Related Questions