Reputation: 2956
My application is using a scoped session and the declarative style of SQLALchemy. It's a web-app and a lot of the DB insertions are executed by Celery
, a task scheduler.
Typically, when deciding to insert an object, my code might do something along the following lines:
from schema import Session
from schema.models import Bike
pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
Session.add(new_bike)
Session.commit()
The issue here is that because a lot of this is done by asynchronous workers, it's possible for one working to be halfway though inserting a Bike
with id=123
, while another one is checking for its existence. In this case the second worker will try and insert a row with the same primary key, and SQLAlchemy will raise an IntegrityError
.
I can't for the life of me find a nice way to deal with this issue apart from swapping out Session.commit()
for:
'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())
def commit(ignore=False):
try:
Session.commit()
except IntegrityError as e:
reason = e.message
logger.warning(reason)
if not ignore:
raise e
if "Duplicate entry" in reason:
logger.info("%s already in table." % e.params[0])
Session.rollback()
And then everywhere I have Session.commit
I now have schema.commit(ignore=True)
where I don't mind that the row is not inserted again.
To me this seems very brittle because of the string checking. Just as an FYI, when an IntegrityError
is raised it looks like this:
(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")
So of course is the primary key I was inserting was something like Duplicate entry is a cool thing
then I suppose I could miss IntegrityError
's which weren't actually because of duplicate primary keys.
Are there any better approaches, which maintain the clean SQLAlchemy approach I'm using (as opposed to starting to write out statements in strings etc. . .)
Db is MySQL (though for unit testing I like to use SQLite, and wouldn't want to hinder that ability with any new approaches).
Cheers!
Upvotes: 49
Views: 56195
Reputation: 27
In my case I was creating two separate dicts in Python and appending them to a list to later do a bulk insert. I was using a dict called item_base and then appending few fields to it then adding the new dict to my bulk insert list.
What I didn't take into account is that in Python, dictionaries are mutable objects, and when you use the assignment clone = item_base or clone_2 = item_base, you're actually creating a reference to the same dictionary in memory. Any modifications you make to clone or clone_2 will also affect item_base because they all refer to the same underlying dictionary.
By using the import copy
Python package and adding the copy.copy(item_base) to set my clone dicts I was able to resolve my duplicate primary key issue. Hopefully this can help save other people the headache I went through.
def get_charges(self, my_set):
charges = []
start_date_str, end_date_str = self.get_start_end_date_str()
for (id, products) in my_set.items():
item_base = {}
line_item = self.get_line_item(end_date_str)
if line_item:
item_base = dict(line_item.__dict__)
else:
item_base['start_date'] = start_date_str
item_base['end_date'] = end_date_str
if products['987']:
clone = copy.copy(item_base)
clone[ 'amount' ] = products['987']
clone[ 'created_date' ] = datetime.now()
clone[ 'quantity' ] = 1
clone[ 'unit_price' ] = products['987']
charges.append( clone )
if products['123']:
clone = copy.copy(item_base)
clone[ 'amount' ] = products['123']
clone[ 'created_date' ] = datetime.now()
clone[ 'quantity' ] = 1
clone[ 'unit_price' ] = products['123']
charges.append(clone)
return charges
Upvotes: 0
Reputation: 10401
Just rollback and retry them one by one, as simple as that:
try:
self._session.bulk_insert_mappings(mapper, items)
self._session.commit()
except IntegrityError:
self._session.rollback()
logger.info("bulk inserting rows failed, fallback to insert one-by-one")
for item in items:
try:
self._session.execute(insert(mapper).values(**item))
self._session.commit()
except SQLAlchemyError as e:
logger.error("Error inserting item: %s for %s", item, e)
Upvotes: 0
Reputation: 513
With the following code, you should be able to do whatever you want, not only for solving this problem.
class SessionWrapper(Session):
def commit(self, ignore=True):
try:
super(SessionWrapper, self).commit()
except IntegrityError as e:
if not ignore:
raise e
message = e.args[0]
if "Duplicate entry" in message:
logging.info("Error while executing %s.\n%s.", e.statement, message)
finally:
super(SessionWrapper, self).close()
def session(self, auto_commit=False):
session_factory = sessionmaker(class_=SessionWrapper, bind=self.engine, autocommit=auto_commit)
return scoped_session(session_factory)
Session = session()
s1 = Session()
p = Test(test="xxx", id=1)
s1.add(p)
s1.commit()
s1.close()
Upvotes: 1
Reputation: 1074
If you use session.merge(bike)
instead of session.add(bike)
, then you will not generate primary key errors. The bike
will be retrieved and updated or created as needed.
Upvotes: 42
Reputation: 3553
Instead of session.add(obj)
you need to use below mentioned codes, this will be much cleaner and you wouldn't need to use custom commit function like you have mentioned. This will ignore on conflict, however, not only for duplicate key but for others too.
mysql:
self.session.execute(insert(self.table, values=values, prefixes=['IGNORE']))
sqlite
self.session.execute(insert(self.table, values=values, prefixes=['OR IGNORE']))
Upvotes: 5
Reputation: 75317
I'm assuming that your primary keys here are natural in some way, which is why you can't rely upon normal autoincrement techniques. So let's say the problem is really one of some unique column you need to insert, which is more common.
if you want the "try to insert, rollback partially on failure", you use a SAVEPOINT, which with SQLAlchemy is begin_nested(). the next rollback() or commit() only acts upon that SAVEPOINT, not the bigger span of things going on.
However, overall the pattern here is just one that should really be avoided. What you really want to be doing here is one of three things. 1. Don't run concurrent jobs that deal with the same keys that need to be inserted. 2. synchronize the jobs somehow on concurrent keys being worked with and 3. use some common service to generate new records of this particular type, shared by jobs (or make sure they're all set up before jobs run).
If you think about it, #2 takes place in any case with a high degree of isolation. Start two postgres sessions. Session 1:
test=> create table foo(id integer primary key);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"
CREATE TABLE
test=> begin;
BEGIN
test=> insert into foo (id) values (1);
session 2:
test=> begin;
BEGIN
test=> insert into foo(id) values(1);
what you'll see is, session 2 blocks, as the row with PK #1 is locked. I'm not sure if MySQL is smart enough to do this, but that's the correct behavior. If OTOH you try to insert a different PK:
^CCancel request sent
ERROR: canceling statement due to user request
test=> rollback;
ROLLBACK
test=> begin;
BEGIN
test=> insert into foo(id) values(2);
INSERT 0 1
test=> \q
it proceeds just fine without blocking.
The point is if you are doing this kind of PK/UQ contention, your celery tasks are going to serialize themselves anyway, or at least, they should be.
Upvotes: 3
Reputation: 41817
You should handle every IntegrityError
the same way: roll back the transaction, and optionally try again. Some databases won't even let you do anything more than that after an IntegrityError
. You could also acquire a lock on the table, or a finer-grained lock if the database allows it, at the beginning of the two conflicting transactions.
Using the with
statement to explicitly begin a transaction, and automatically commit (or rollback on any exception):
from schema import Session
from schema.models import Bike
session = Session()
with session.begin():
pk = 123 # primary key
bike = session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
session.add(new_bike)
Upvotes: 10