xjq233p_1
xjq233p_1

Reputation: 8060

Sqlalchemy session.rollback from IntegrityError causes queuepool to run out of handlers?

I have the following table:

class Feedback(Base):
  __tablename__ = 'feedbacks'
  __table_args__ = (UniqueConstraint('user_id', 'look_id'),)
  id = Column(Integer, primary_key=True)
  user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
  look_id = Column(Integer, ForeignKey('looks.id'), nullable=False)

I am currently inserting a lot of entries into this table that will be violating that UniqueConstraint.

I am using the following code:

  for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
    feedback = Feedback()
    feedback.user_id = User.get_or_create(comment.model_id).id
    feedback.look_id = comment.commentable_id
    session.add(feedback)
    try:        # Refer to T20
      session.flush()
    except IntegrityError,e:
      print "IntegrityError", e
      session.rollback()
  session.commit()

and I am getting the following error:

IntegrityError (IntegrityError) duplicate key value violates unique constraint "feedbacks_user_id_look_id_key"
DETAIL:  Key (user_id, look_id)=(140, 263008) already exists.
 'INSERT INTO feedbacks (user_id, look_id, score) VALUES (%(user_id)s, %(look_id)s, %(score)s) RETURNING feedbacks.id' {'user_id': 140, 'score': 1, 'look_id': 263008}
IntegrityError (IntegrityError) duplicate key value violates unique constraint "feedbacks_user_id_look_id_key"
...
(there's about 24 of these integrity errors here)
...
DETAIL:  Key (user_id, look_id)=(173, 263008) already exists.
 'INSERT INTO feedbacks (user_id, look_id, score) VALUES (%(user_id)s, %(look_id)s, %(score)s) RETURNING feedbacks.id' {'user_id': 173, 'score': 1, 'look_id': 263008}
No handlers could be found for logger "sqlalchemy.pool.QueuePool"
Traceback (most recent call last):
  File "load.py", line 40, in <module>
    load_crawl_data_into_feedback()
  File "load.py", line 21, in load_crawl_data_into_feedback
    for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2337, in instances
    fetch = cursor.fetchmany(self._yield_per)
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3230, in fetchmany
    self.cursor, self.context)
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3223, in fetchmany
    l = self.process_rows(self._fetchmany_impl(size))
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3343, in _fetchmany_impl
    row = self._fetchone_impl()
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3333, in _fetchone_impl
    self.__buffer_rows()
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3326, in __buffer_rows
    self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
sqlalchemy.exc.ProgrammingError: (ProgrammingError) named cursor isn't valid anymore None None

Before you jump to conclusions about this error being caused by yield_per, I can assure you that yield_per is not the culprit here.

I tried the same code without the unique constraints and I did not experience any error at all.

I believe that the integrity errors are causing No handlers could be found for logger "sqlalchemy.pool.QueuePool".

I assume that every integrity error is killing each "thread" in the queuepool.

Can someone enlighten me as to what is happening?

If I can't do much about the data at this point, what would you recommend me to do?

Upvotes: 3

Views: 5003

Answers (2)

zzzeek
zzzeek

Reputation: 75237

"Before you jump to conclusions about this error being caused by yield_per, I can assure you that yield_per is not the culprit here. "

I'm not sure why you'd think that - yield_per() is very important here, and this could be divined quickly by simply trying out the same test without the yield_per() to see if the behavior is different. By using yield_per(), a psycopg2 cursor is being kept open as that loop continues. But then you're emitting a ROLLBACK on the psycopg2 connection via session.rollback(). That would exactly cause an error like "named cursor isn't valid anymore " to occur. In fact the only reason there is a named cursor is because that's how you do server side cursors with psycopg2 which is part of what yield_per() enables.

"I tried the same code without the unique constraints and I did not experience any error at all."

this is because without constraints, no exception is thrown, and the rollback() isn't hit.

Upvotes: 4

Eevee
Eevee

Reputation: 48564

That error is just from the Python logging module; your pool class is trying to log some debug message but you don't have SQLA logging configured. Configuring logging is easy, and then you can see what it's actually trying to say.

I'm not quite sure what's going on here, but it certainly doesn't help that you're rolling back your top-level transaction dozens of times. Rollback ends the transaction and invalidates every live row object. That certainly won't interact well with yield_per.

If your database supports savepoints or nesting transactions (i.e., is Postgres or Oracle... or maybe recent MySQL?), try starting a nested transaction for each attempt:

for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
    try:
        with session.begin_nested():
            feedback = Feedback()
            feedback.user_id = User.get_or_create(comment.model_id).id
            feedback.look_id = comment.commentable_id
            session.add(feedback)
            session.flush()
    except IntegrityError, e:
        print "IntegrityError", e

session.commit()

with rolls back on error and commits on success, so a failed flush won't wreak havoc on the rest of your main transaction.

If you don't have the backend support, the other sensible options coming to mind are:

  • Complexify your query: do a LEFT JOIN with your feedback table so you know in-app whether or not the feedback row already exists.

  • If you're willing to make (user_id, look_id) be your primary key, I think you can use session.merge(feedback). This acts like an insert-or-update based on primary key: if SQLA can find an existing row with the same pk, it'll update that, else it'll create a new one in the database. Would risk firing off an extra SELECT for every new row, though.

Upvotes: 5

Related Questions