Reputation: 1207
I am using sqlalchemy for a project with a postgres backend (I'm not specifically tied to sqlalchemy/postgres). Currently, I have a 'bulk insert' operation that creates many rows in the database. I could create a bunch of sqlalchemy sessions in different processes, and distribute the job across multiple workers to improve performance. However, it is absolutely critical that this operation is atomic. If one of the inserts fails, the whole transaction needs to be rolled back.
After a bit of research, and seeing threads such as Multi-threaded use of SQLAlchemy, it doesn't seem like it will be trivially parallelizable.
Is there a way I can help speed up performance, without sacrificing atomicity?
Upvotes: 1
Views: 615
Reputation: 146
Since atomicity is critical, parallelization is not an option.
If performance is of the essence, sqlalchemy is probably not the most efficient way to insert large amounts of data in the database. Assuming that you are using psycopg2 as your database driver, you can have a look at the `COPY command (which is blazingly fast).
Have a look at the following example which combines sqlalchemy and the raw copy_expert command of psycopg2. This is the fastest I have ever gotten inserts, resulting in a couple of million rows per second.
The syntax for the COPY
command and further here, the documentation for copy_expert
is located here
import csv
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base
# setup testdata
testdata = [
["col1", "col2", "text"],
[21, 50, "mystring"],
[48, 78, None],
[49, 78, "string with 'quotes'"],
]
with open("testdata.csv", "w") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"')
writer.writerows(testdata)
# Define Testtable
Base = declarative_base()
class TestTable(Base):
__tablename__ = "mytable"
col1 = Column(Integer, primary_key=True)
col2 = Column(Integer)
text = Column(String)
# interesting code starts here
engine = create_engine("postgresql+psycopg2://lukas:123@localhost/test")
Base.metadata.create_all(bind=engine)
# raw_connection returns a psycopg2 connection which has a copy_expert method
conn = engine.raw_connection()
try:
with open("testdata.csv", "r", buffering=8192) as f:
cols = [c.strip() for c in f.readline().split(",")]
f.seek(0)
cur = conn.cursor()
# These two lines are the gist of it all
command = f"""COPY {TestTable.__table__}({','.join(cols)}) FROM STDIN WITH (format csv, header, delimiter ',', quote '"', null "\'\'")"""
cur.copy_expert(command, f)
cur.close()
conn.commit()
conn.close()
except Exception as e:
conn.rollback()
raise e
Upvotes: 1