Reputation: 2825
I have multiprocessing application which needs upsert (insert, if exists update) functionality.
I decided to approach to upsert using trigger solution. (You add additional column for each upsert-enabled tables named is_upsert and in trigger checks this field, if it is false, you do normal insert, but if it is true, you do upsert logic - try update and if it fails because record doesn't exists you try insert).
Here is trigger logic:
CREATE OR REPLACE FUNCTION upsert_trigger_function_{table}()
RETURNS TRIGGER AS $upsert_trigger_function$
DECLARE
row record;
BEGIN
RAISE NOTICE 'upsert trigger fired, upsert is %%', NEW.{upsert_column};
IF NEW.{upsert_column} THEN
NEW.{upsert_column} := false;
LOOP
UPDATE {table} SET
{update_set}
WHERE
{update_where}
;
IF found THEN
RETURN NULL;
END IF;
BEGIN
INSERT INTO {table} SELECT NEW.*;
RETURN NULL;
EXCEPTION WHEN unique_violation THEN
-- loop
END;
END LOOP;
RETURN NULL;
ELSE
RETURN NEW;
END IF;
END;
$upsert_trigger_function$ LANGUAGE plpgsql;
Test object, (add_upsert just makes above trigger to be installed):
class SimpleItem(PipelinesBase):
__tablename__ = 'simple_item'
id = Column(BigInteger, primary_key=True)
item_type = Column(String, nullable=False, unique=True)
quantity = Column(Integer, nullable=False)
price = Column(Float, nullable=False)
in_stock = Column(Boolean, nullable=False)
arrived = Column(Date)
sys_time = Column(
TSTZRANGE,
nullable=False,
server_default=text("TSTZRANGE(now(), null)"),
)
_upsert = Column(Boolean, nullable=False, server_default=text('false'))
_type_identifier = 1400
add_upsert(SimpleItem, ['item_type'])
Test script
from sqlalchemy.engine import create_engine
from pipelines.settings_proxy import TEST_DB
from sqlalchemy.orm.session import sessionmaker
from test_pipelines.test_persistence.mock_items import SimpleItem
from test_pipelines.test_persistence.helpers import random_simple_item
def main():
engine = create_engine(TEST_DB)
values = random_simple_item(_upsert=True)
session = sessionmaker(engine)()
si = SimpleItem(**values)
session.add(si)
session.commit()
si = SimpleItem(**values)
si.price = 1
session.merge(si)
session.commit()
It works properly when using SQL statesments but when I'm using it along with SQLAlchemy ORM add object there is
Traceback (most recent call last):
File "pipelines/persistence/experiment_with_upsert_field.py", line 59, in <module>
main()
File "pipelines/persistence/experiment_with_upsert_field.py", line 27, in main
session.commit()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 801, in commit
self.transaction.commit()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 392, in commit
self._prepare_impl()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 372, in _prepare_impl
self.session.flush()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2019, in flush
self._flush(objects)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2137, in _flush
transaction.rollback(_capture_exception=True)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 184, in reraise
raise value
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2101, in _flush
flush_context.execute()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/unitofwork.py", line 373, in execute
rec.execute(self)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/unitofwork.py", line 532, in execute
uow
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/persistence.py", line 174, in save_obj
mapper, table, insert)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/persistence.py", line 800, in _emit_insert_statements
execute(statement, params)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 914, in execute
return meth(self, multiparams, params)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
result = context._setup_crud_result_proxy()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 828, in _setup_crud_result_proxy
self._setup_ins_pk_from_implicit_returning(row)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 893, in _setup_ins_pk_from_implicit_returning
for col in table.primary_key
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 891, in <listcomp>
for col, value in [
TypeError: 'NoneType' object is not subscriptable
raise in depths of sqlalchemy.engine.default. I'm pretty sure that it is because my trigger returns NULL when is doing UPSERT and SQLAlchemy tries to propagate object with inserted ID using RETURNING statement. Which obviuosly fails because it is not possible to get proper ID in trigger from it's subordinate INSERT/UPDATE and at the same time block normal normal insert.
Note that I already tested upsert as special function which doesn't work for me because I sacrifice SQLAlchemy assistance with updating complex items (those with relationships to other items).
So here is my question: How can I tell SQLAlchemy to avoid loading inserted objects ID?
Upvotes: 3
Views: 1199
Reputation: 2825
After research and tests I found that it can't be done in SQLAlchemy ORM. However it can be done in SQLAlchemy Core by setting inline
keyword argument to True:
engine.execute(
SimpleItem.__table__.insert(inline=True),
values
)
values['price'] = 1
engine.execute(
SimpleItem.__table__.insert(inline=True),
values
)
Upvotes: 1