Reputation: 8060
I have the following table:
class Feedback(Base):
__tablename__ = 'feedbacks'
__table_args__ = (UniqueConstraint('user_id', 'look_id'),)
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
look_id = Column(Integer, ForeignKey('looks.id'), nullable=False)
I am currently inserting a lot of entries into this table that will be violating that UniqueConstraint.
I am using the following code:
for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
feedback = Feedback()
feedback.user_id = User.get_or_create(comment.model_id).id
feedback.look_id = comment.commentable_id
session.add(feedback)
try: # Refer to T20
session.flush()
except IntegrityError,e:
print "IntegrityError", e
session.rollback()
session.commit()
and I am getting the following error:
IntegrityError (IntegrityError) duplicate key value violates unique constraint "feedbacks_user_id_look_id_key"
DETAIL: Key (user_id, look_id)=(140, 263008) already exists.
'INSERT INTO feedbacks (user_id, look_id, score) VALUES (%(user_id)s, %(look_id)s, %(score)s) RETURNING feedbacks.id' {'user_id': 140, 'score': 1, 'look_id': 263008}
IntegrityError (IntegrityError) duplicate key value violates unique constraint "feedbacks_user_id_look_id_key"
...
(there's about 24 of these integrity errors here)
...
DETAIL: Key (user_id, look_id)=(173, 263008) already exists.
'INSERT INTO feedbacks (user_id, look_id, score) VALUES (%(user_id)s, %(look_id)s, %(score)s) RETURNING feedbacks.id' {'user_id': 173, 'score': 1, 'look_id': 263008}
No handlers could be found for logger "sqlalchemy.pool.QueuePool"
Traceback (most recent call last):
File "load.py", line 40, in <module>
load_crawl_data_into_feedback()
File "load.py", line 21, in load_crawl_data_into_feedback
for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2337, in instances
fetch = cursor.fetchmany(self._yield_per)
File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3230, in fetchmany
self.cursor, self.context)
File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3223, in fetchmany
l = self.process_rows(self._fetchmany_impl(size))
File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3343, in _fetchmany_impl
row = self._fetchone_impl()
File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3333, in _fetchone_impl
self.__buffer_rows()
File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3326, in __buffer_rows
self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
sqlalchemy.exc.ProgrammingError: (ProgrammingError) named cursor isn't valid anymore None None
Before you jump to conclusions about this error being caused by yield_per, I can assure you that yield_per is not the culprit here.
I tried the same code without the unique constraints and I did not experience any error at all.
I believe that the integrity errors are causing No handlers could be found for logger "sqlalchemy.pool.QueuePool".
I assume that every integrity error is killing each "thread" in the queuepool.
Can someone enlighten me as to what is happening?
If I can't do much about the data at this point, what would you recommend me to do?
Upvotes: 3
Views: 5003
Reputation: 75237
"Before you jump to conclusions about this error being caused by yield_per, I can assure you that yield_per is not the culprit here. "
I'm not sure why you'd think that - yield_per() is very important here, and this could be divined quickly by simply trying out the same test without the yield_per() to see if the behavior is different. By using yield_per(), a psycopg2 cursor is being kept open as that loop continues. But then you're emitting a ROLLBACK on the psycopg2 connection via session.rollback()
. That would exactly cause an error like "named cursor isn't valid anymore " to occur. In fact the only reason there is a named cursor is because that's how you do server side cursors with psycopg2 which is part of what yield_per() enables.
"I tried the same code without the unique constraints and I did not experience any error at all."
this is because without constraints, no exception is thrown, and the rollback() isn't hit.
Upvotes: 4
Reputation: 48564
That error is just from the Python logging
module; your pool class is trying to log some debug message but you don't have SQLA logging configured. Configuring logging is easy, and then you can see what it's actually trying to say.
I'm not quite sure what's going on here, but it certainly doesn't help that you're rolling back your top-level transaction dozens of times. Rollback ends the transaction and invalidates every live row object. That certainly won't interact well with yield_per
.
If your database supports savepoints or nesting transactions (i.e., is Postgres or Oracle... or maybe recent MySQL?), try starting a nested transaction for each attempt:
for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
try:
with session.begin_nested():
feedback = Feedback()
feedback.user_id = User.get_or_create(comment.model_id).id
feedback.look_id = comment.commentable_id
session.add(feedback)
session.flush()
except IntegrityError, e:
print "IntegrityError", e
session.commit()
with
rolls back on error and commits on success, so a failed flush
won't wreak havoc on the rest of your main transaction.
If you don't have the backend support, the other sensible options coming to mind are:
Complexify your query: do a LEFT JOIN
with your feedback table so you know in-app whether or not the feedback row already exists.
If you're willing to make (user_id, look_id)
be your primary key, I think you can use session.merge(feedback)
. This acts like an insert-or-update based on primary key: if SQLA can find an existing row with the same pk, it'll update that, else it'll create a new one in the database. Would risk firing off an extra SELECT
for every new row, though.
Upvotes: 5