Reputation: 14900
All,
I'm reading a csv file and adding the data to a MySQL DB with sqlalchemy. One of the tables is the address table, which is only supposed to hold unique addresses. There is a relationship between these addresses and another table of "statements" which have a foreign key field of the address id.
So, for each row in my data file, I create a new statement obj, then get the id for the associated address. If the address already exists, that id is returned. Otherwise, I create a new address obj and return that id. This is done using the code below, adapted from this SO question.
def get_or_create(self, model, rec):
instance = self.session.query(model).filter_by(**dict(filter(lambda (x,y): x in model.__dict__.keys(), rec.iteritems()))).first()
if instance:
return instance
else:
instance = model(rec)
return instance
I'm using GUID's for my id field, and it is part of the primary key for the address table:
class address(Base):
__tablename__ = 'address'
id = id_column()
name = Column(String(75), primary_key=True)
Address_Line_One = Column(String(50), primary_key=True)
Address_Line_Two = Column(String(50), primary_key=True)
Address_Line_Three = Column(String(50), primary_key=True)
Address_Line_Four = Column(String(50), primary_key=True)
The id_column()
comes from here, though it has been converted to CHAR(32)
due to limitations elsewhere. Finally, there is the snippet here:
currStatement = statements(rec, id=currGUID)
currStatement.address = self.get_or_create(address, rec)
This all works fine except it is very slow. For ~65,000 statements inserted in one transaction, I see 1.5 hr insert time on a clean test DB. Watching the insert in realtime shows it quickly get to ~10,000 rows, then the insert speed starts falling off.
What can I do to speed up this insert time?
Edit:
After further testing, I've found that the slow insert time is partially because each object is inserted individually. So, I have ~65,000 rows, each of which becomes several sqlalchemy objects, inserted individually. With sqlalchemy 0.7, how can I bulk insert my objects?
Upvotes: 3
Views: 2961
Reputation: 14900
Alright!
So the answer is that I was individually inserting each row, and round tripping to the DB for each address check. The address check was the worst part, since it got exponentially slower. I calculated that inserting the original data (1.5 hrs), and then inserting the same data again, would take ~9 hrs!
So this answer will go over what I did to convert to bulk insert statements, as well as some things to watch out for.
ORM is great, but realize it doesn't exactly mesh well with bulk inserts. Bulk inserts require using the lower level execute
statements on the session. These don't take ORM objects as inputs, but a list of dictionaries and an insert
object. So if your converting a csv file full of rows into ORM objects, you need to NOT add them to the current session, but instead convert them to dictionaries for later.
def asdict(obj):
return dict((col.name, getattr(obj, col.name))
for col in class_mapper(obj.__class__).mapped_table.c)
currGUID = uuid.uuid4()
currPrintOrMail = printOrMail(rec, id=currGUID)
currStatement = statements(rec, id=currGUID)
currAddress = self.get_or_create(address, rec)
currStatement.address = currAddress
self.currPrintOrMail_bulk.append(asdict(currPrintOrMail))
self.currStatement_bulk.append(asdict(currStatement))
The asdict method comes from here. That gets you dictionaries of the columns in the ORM objects created. They never get added to the session, and drop out of memory shortly thereafter.
If you have set up an ORM relationship:
class statements(Base):
__tablename__ = 'statements'
id = id_column()
county = Column(String(50),default='',nullable=False)
address_id = Column(CHAR(36), ForeignKey('address.id'))
address = relationship("address", backref=backref("statements", cascade=""))
printOrMail_id = Column(CHAR(36), ForeignKey('printOrMail.id'))
pom = relationship("printOrMail", backref=backref("statements", cascade=""))
property_id = Column(CHAR(36), ForeignKey('property.id'))
prop = relationship("property", backref=backref("statements", cascade=""))
Make sure cascade is blank in the backref! Otherwise, inserting an object in the relationship into the session will cascade through the rest of the objects. When you try to bulk insert your values later, they will be rejected as duplicates...if you're lucky.
This is important because part of the requirements was getting the address_id for a valid address if it existed, and adding the address if it did not. Since the query round tripping was so slow, I changed get_or_create
to:
def get_or_create(self, model, rec):
"""Check if current session has address. If not, query DB for it. If no one has the address, create and flush a new one to the session."""
instance = self.session.query(model).get((rec['Name'], rec['Address_Line_One'], rec['Address_Line_Two'], rec['Address_Line_Three'], rec['Address_Line_Four']))
if instance:
return instance
else:
instance = model(rec)
self.session.add(instance)
self.session.flush()
return instance
Using get
causes sqlalchemy to check the session first, preventing trips across the network. But, it only works if new addresses are added to the session! Remember the relationship? This was cascading into inserts of the statements. Also, if you don't flush()
or have autoflush=True
then get
cannot see the newly added objects.
When you create the session, persist your objects!
self.session = sessionmaker(autoflush=False, expire_on_commit=False)
If you don't include expire_on_commit=False
then you lose your addresses, and start round-tripping again.
Now we've got a list of dictionaries for the ORM objects to insert. But we also need an insert object.
self.session.execute(printOrMail.__table__.insert(), self.currPrintOrMail_bulk)
self.session.execute(statements.__table__.insert(), self.currStatement_bulk)
Buried in the docs, it seems that one can use classname.__table__
for the necessary table object, required by insert. So on the session, using the ORM class to get the table to get the insert object, run an execute with the list of dictionaries. Don't forget to commit afterwards!
This will allow you to successfully mix bulk inserting and ORM with relationships and querying for unique entries in sqlalchemy. Just watch out for running out of memory. I had to bulk insert ~30,000
records at a time, otherwise py2.7(32bit)
would crash at around 2G
used.
Upvotes: 7