kowpow
kowpow

Reputation: 125

Python SQLAlchemy ON DUPLICATE KEY UPDATE with multiple records

I'd like to use the ON DUPLICATE KEY UPDATE optionality provided by SQLAlchemy to upsert a bunch of records.

These records have been sucessfully inserted with python using the following (where connection is engine.connect() object and table is a Table object)

record_list = [{'col1': 'name1', 'col2': '2015-01-31', 'col3': 27.2},
               {'col1': 'name1', 'col2': '2016-01-31', 'col3': 25.2}]
query = insert(table)
results = connection.execute(query, record_list)

Looking at the docs at https://docs.sqlalchemy.org/en/13/dialects/mysql.html#insert-on-duplicate-key-update-upsert as well as a number of SO questions (including the suggestion it's possible under the comments on SQLAlchemy ON DUPLICATE KEY UPDATE ) I've tried a number of different examples, but there were none that I could see that address multiple records with the upsert statement using this method.

I'm trying along the lines of

query = insert(table).values(record_list)
upsert_query = query.on_duplicate_key_update()
results = connection.execute(upsert_query)

but either get the issue that the .on_duplicate_key_update() requires cant be empty or that the SQL syntax is wrong.

If anyone has sucessfully managed and could help me with the code structure here I'd really appreciate it.

Upvotes: 1

Views: 5073

Answers (4)

chrisinmtown
chrisinmtown

Reputation: 4324

Thanks to Federico Caselli of the SQLAlchemy project for explaining how to use on_duplicate_key_update in a discussion https://github.com/sqlalchemy/sqlalchemy/discussions/9328

Here's a Python3 script that demonstrates how to use SQLAlchemy version 2 to implement upsert using on_duplicate_key_update in the MySQL dialect:

import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "foo"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))


engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()

# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)

# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')

# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "x"},
        {"id": 2, "name": "y"},
        {"id": 3, "name": "w"},
        {"id": 4, "name": "z"},
    ]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')

# demonstrate upsert
ups_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "xx"},
        {"id": 2, "name": "yy"},
        {"id": 3, "name": "ww"},
        {"id": 5, "name": "new"},
    ]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()

users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')

Upvotes: 3

songofhawk
songofhawk

Reputation: 153

@user12730260’s answer is great! but has a little bug, the correct code is:

query = insert(table).values(record_list)   # each record is a dict
update_dict = {x.name: x for x in query.inserted}  # specify columns for update, u can filter some of it
upsert_query = query.on_duplicate_key_update(**update_dict) # here's the modification: u should expand the columns dict

Upvotes: 2

user12730260
user12730260

Reputation: 126

I just ran into a similar problem and creating a dictionary out of query.inserted solved it for me.

query = insert(table).values(record_list)
update_dict = {x.name: x for x in query.inserted}
upsert_query = query.on_duplicate_key_update(update_dict)

Upvotes: 11

julian
julian

Reputation: 471

Your on_duplicate_key_update function requires arguments that define the data to be inserted in the update. Please have a look at the example in the documentation that you have already found.

insert().on_duplicate_key_update({"key": "value"})

Upvotes: 0

Related Questions