smang
smang

Reputation: 1207

Parallelizable sqlalchemy

I am using sqlalchemy for a project with a postgres backend (I'm not specifically tied to sqlalchemy/postgres). Currently, I have a 'bulk insert' operation that creates many rows in the database. I could create a bunch of sqlalchemy sessions in different processes, and distribute the job across multiple workers to improve performance. However, it is absolutely critical that this operation is atomic. If one of the inserts fails, the whole transaction needs to be rolled back.

After a bit of research, and seeing threads such as Multi-threaded use of SQLAlchemy, it doesn't seem like it will be trivially parallelizable.

Is there a way I can help speed up performance, without sacrificing atomicity?

Upvotes: 1

Views: 615

Answers (1)

habet
habet

Reputation: 146

Since atomicity is critical, parallelization is not an option.

If performance is of the essence, sqlalchemy is probably not the most efficient way to insert large amounts of data in the database. Assuming that you are using psycopg2 as your database driver, you can have a look at the `COPY command (which is blazingly fast).

Have a look at the following example which combines sqlalchemy and the raw copy_expert command of psycopg2. This is the fastest I have ever gotten inserts, resulting in a couple of million rows per second.

The syntax for the COPYcommand and further here, the documentation for copy_expert is located here

import csv
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base

# setup testdata
testdata = [
    ["col1", "col2", "text"],
    [21, 50, "mystring"],
    [48, 78, None],
    [49, 78, "string with 'quotes'"],
]
with open("testdata.csv", "w") as csvfile:
    writer = csv.writer(csvfile, delimiter=",", quotechar='"')
    writer.writerows(testdata)



# Define Testtable
Base = declarative_base()

class TestTable(Base):
    __tablename__ = "mytable"

    col1 = Column(Integer, primary_key=True)
    col2 = Column(Integer)
    text = Column(String)



# interesting code starts here
engine = create_engine("postgresql+psycopg2://lukas:123@localhost/test")
Base.metadata.create_all(bind=engine)


# raw_connection returns a psycopg2 connection which has a copy_expert method
conn = engine.raw_connection()

try:
    with open("testdata.csv", "r", buffering=8192) as f:
        cols = [c.strip() for c in f.readline().split(",")]
        f.seek(0)
        cur = conn.cursor()

        #  These two lines are the gist of it all
        command = f"""COPY {TestTable.__table__}({','.join(cols)}) FROM STDIN WITH (format csv, header, delimiter ',', quote '"', null "\'\'")"""
        cur.copy_expert(command, f)

        cur.close()
        conn.commit()
        conn.close()
except Exception as e:
    conn.rollback()
    raise e

Upvotes: 1

Related Questions