Reputation: 125
I'd like to use the ON DUPLICATE KEY UPDATE optionality provided by SQLAlchemy to upsert a bunch of records.
These records have been sucessfully inserted with python using the following (where connection is engine.connect() object and table is a Table object)
record_list = [{'col1': 'name1', 'col2': '2015-01-31', 'col3': 27.2},
{'col1': 'name1', 'col2': '2016-01-31', 'col3': 25.2}]
query = insert(table)
results = connection.execute(query, record_list)
Looking at the docs at https://docs.sqlalchemy.org/en/13/dialects/mysql.html#insert-on-duplicate-key-update-upsert as well as a number of SO questions (including the suggestion it's possible under the comments on SQLAlchemy ON DUPLICATE KEY UPDATE ) I've tried a number of different examples, but there were none that I could see that address multiple records with the upsert statement using this method.
I'm trying along the lines of
query = insert(table).values(record_list)
upsert_query = query.on_duplicate_key_update()
results = connection.execute(upsert_query)
but either get the issue that the .on_duplicate_key_update() requires cant be empty or that the SQL syntax is wrong.
If anyone has sucessfully managed and could help me with the code structure here I'd really appreciate it.
Upvotes: 1
Views: 5073
Reputation: 4324
Thanks to Federico Caselli of the SQLAlchemy project for explaining how to use on_duplicate_key_update
in a discussion https://github.com/sqlalchemy/sqlalchemy/discussions/9328
Here's a Python3 script that demonstrates how to use SQLAlchemy version 2 to implement upsert using on_duplicate_key_update
in the MySQL dialect:
import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "foo"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()
# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)
# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')
# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "x"},
{"id": 2, "name": "y"},
{"id": 3, "name": "w"},
{"id": 4, "name": "z"},
]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')
# demonstrate upsert
ups_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "xx"},
{"id": 2, "name": "yy"},
{"id": 3, "name": "ww"},
{"id": 5, "name": "new"},
]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')
Upvotes: 3
Reputation: 153
@user12730260’s answer is great! but has a little bug, the correct code is:
query = insert(table).values(record_list) # each record is a dict
update_dict = {x.name: x for x in query.inserted} # specify columns for update, u can filter some of it
upsert_query = query.on_duplicate_key_update(**update_dict) # here's the modification: u should expand the columns dict
Upvotes: 2
Reputation: 126
I just ran into a similar problem and creating a dictionary out of query.inserted solved it for me.
query = insert(table).values(record_list)
update_dict = {x.name: x for x in query.inserted}
upsert_query = query.on_duplicate_key_update(update_dict)
Upvotes: 11
Reputation: 471
Your on_duplicate_key_update function requires arguments that define the data to be inserted in the update. Please have a look at the example in the documentation that you have already found.
insert().on_duplicate_key_update({"key": "value"})
Upvotes: 0