Reputation: 443
cities = DBSession.query(City).filter(City.big=='Y').options(joinedload(City.hash)).limit(1)
t0 = time.time()
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)
for kw_status in keyword_statuses:
kw_status.status = 1
DBSession.commit()
t0 = time.time()
w = SWorker(threads_no=1, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug')
w.work()
print 'finished'
The above code selects from table a keyword status with select for update. That it locks the row until the row is updated.
As you can see I update the row and commit the change.
kw_status.status = 1
DBSession.commit()
After that I create a SWorker object which puts tasks in a queue and creates a number of threads that process the queue ( here just one for simplicity ).
The worker when it finishes processing updates the
kw_status.status = 2
DBSession.commit()
at this point I get an exception
(1205, 'Lock wait timeout exceeded; try restarting transaction') 'UPDATE g_search_keyword_status SET status=%s WHERE g_search_keyword_status.keyword_id = %s' (2, 10000001L)
So it seems that the row is locked. But before I start the worker I have updated the status to 1 and I have commit the change so the row should be unlocked.
Also I use a scoped_session
DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=engine
)
)
Upvotes: 2
Views: 4622
Reputation: 443
Problem was lazy loading. Notice
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)
for kw_status in keyword_statuses:
kw_status.status = 1
DBSession.commit()
The above code does not loads the result into memory but only when it actually needed.
And in my SWorker I loop through keywordStatus.keyword object, so the table locks foreach thread .
The problem solved when I loaded the results to memory using all()
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1).all()
Upvotes: 1