Reputation: 815
I am currently writing a web app (Flask) using SQLAlchemy (on GAE, connecting to Google's cloud MySQL) and needing to do bulk updates of a table. In short, a number of calculations are done resulting in a single value needing to be updated on 1000's of objects. At the moment I'm doing it all in a transaction, but still at the end, the flush/commit is taking ages.
The table has an index on id
and this is all carried out in a single transaction. So I believe I've avoided the usual mistakes, but is is still very slow.
INFO 2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656}
From my understanding there is no way to do a bulk update in SQL actually, and the statement above ends up being multiple UPDATE statements being sent to the server.
I've tried using Session.bulk_update_mappings()
but that doesn't seem to actually do anything :( Not sure why, but the updates never actually happen. I can't see any examples of this method actually being used (including in the performance suite) so not sure if it is intended to be used.
One technique I've seen discussed is doing a bulk insert into another table and then doing an UPDATE JOIN. I've given it a test, like below, and it seems to be significantly faster.
wallets = db_session.query(Wallet).all()
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ]
db_session.bulk_save_objects(ledgers)
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
db_session.execute('TRUNCATE ledger')
But the problem now is how to structure my code. I'm using the ORM and I need to somehow not 'dirty' the original Wallet
objects so that they don't get committed in the old way. I could just create these Ledger
objects instead and keep a list of them about and then manually insert them at the end of my bulk operation. But that almost smells like I'm replicating some of the work of the ORM mechanism.
Is there a smarter way to do this? So far my brain is going down something like:
class Wallet(Base):
...
_balance = Column(Float)
...
@property
def balance(self):
# first check if we have a ledger of the same id
# and return the amount in that, otherwise...
return self._balance
@balance.setter
def balance(self, amount):
l = Ledger(id=self.id, amount=amount)
# add l to a list somewhere then process later
# At the end of the transaction, do a bulk insert of Ledgers
# and then do an UPDATE JOIN and TRUNCATE
As I said, this all seems to be fighting against the tools I (may) have. Is there a better way to be handling this? Can I tap into the ORM mechanism to be doing this? Or is there an even better way to do the bulk updates?
EDIT: Or is there maybe something clever with events and sessions? Maybe before_flush?
EDIT 2: So I have tried to tap into the event machinery and now have this:
@event.listens_for(SignallingSession, 'before_flush')
def before_flush(session, flush_context, instances):
ledgers = []
if session.dirty:
for elem in session.dirty:
if ( session.is_modified(elem, include_collections=False) ):
if isinstance(elem, Wallet):
session.expunge(elem)
ledgers.append(Ledger(id=elem.id, amount=elem.balance))
if ledgers:
session.bulk_save_objects(ledgers)
session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
session.execute('TRUNCATE ledger')
Which seems pretty hacky and evil to me, but appears to work OK. Any pitfalls, or better approaches?
-Matt
Upvotes: 4
Views: 15974
Reputation: 142298
Generally it is poor schema design to need to update thousands of rows frequently. That aside...
Plan A: Write ORM code that generates
START TRANSACTION;
UPDATE wallet SET balance = ... WHERE id = ...;
UPDATE wallet SET balance = ... WHERE id = ...;
UPDATE wallet SET balance = ... WHERE id = ...;
...
COMMIT;
Plan B: Write ORM code that generates
CREATE TEMPORARY TABLE ToDo (
id ...,
new_balance ...
);
INSERT INTO ToDo -- either one row at a time, or a bulk insert
UPDATE wallet
JOIN ToDo USING(id)
SET wallet.balance = ToDo.new_balance; -- bulk update
(Check the syntax; test; etc.)
Upvotes: 1
Reputation: 20518
What you're essentially doing is bypassing the ORM in order to optimize the performance. Therefore, don't be surprised that you're "replicating the work the ORM is doing" because that's exactly what you need to do.
Unless you have a lot of places where you need to do bulk updates like this, I would recommend against the magical event approach; simply writing the explicit queries is much more straightforward.
What I recommend doing is using SQLAlchemy Core instead of the ORM to do the update:
ledger = Table("ledger", db.metadata,
Column("wallet_id", Integer, primary_key=True),
Column("new_balance", Float),
prefixes=["TEMPORARY"],
)
wallets = db_session.query(Wallet).all()
# figure out new balances
balance_map = {}
for w in wallets:
balance_map[w.id] = calculate_new_balance(w)
# create temp table with balances we need to update
ledger.create(bind=db.session.get_bind())
# insert update data
db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v}
for k, v in balance_map.items()])
# perform update
db.session.execute(Wallet.__table__
.update()
.values(balance=ledger.c.new_balance)
.where(Wallet.__table__.c.id == ledger.c.wallet_id))
# drop temp table
ledger.drop(bind=db.session.get_bind())
# commit changes
db.session.commit()
Upvotes: 5