Giorgos Komnino
Giorgos Komnino

Reputation: 443

mysql and sqlalchemy with_lockmode('update') how it works?

cities = DBSession.query(City).filter(City.big=='Y').options(joinedload(City.hash)).limit(1)

t0 = time.time()
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)

for kw_status in keyword_statuses:
    kw_status.status = 1
    DBSession.commit() 

t0 = time.time()
w = SWorker(threads_no=1, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug')

w.work()

print 'finished'

The above code selects from table a keyword status with select for update. That it locks the row until the row is updated.

As you can see I update the row and commit the change.

kw_status.status = 1
DBSession.commit()    

After that I create a SWorker object which puts tasks in a queue and creates a number of threads that process the queue ( here just one for simplicity ).

The worker when it finishes processing updates the

kw_status.status = 2
DBSession.commit()

at this point I get an exception

(1205, 'Lock wait timeout exceeded; try restarting transaction') 'UPDATE g_search_keyword_status SET status=%s WHERE g_search_keyword_status.keyword_id = %s' (2, 10000001L)

So it seems that the row is locked. But before I start the worker I have updated the status to 1 and I have commit the change so the row should be unlocked.

Also I use a scoped_session

DBSession = scoped_session(
    sessionmaker(
    autoflush=True,
    autocommit=False,
    bind=engine
    )
)

Upvotes: 2

Views: 4622

Answers (1)

Giorgos Komnino
Giorgos Komnino

Reputation: 443

Problem was lazy loading. Notice

keyword_statuses =  DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)

for kw_status in keyword_statuses:
    kw_status.status = 1
DBSession.commit() 

The above code does not loads the result into memory but only when it actually needed.

And in my SWorker I loop through keywordStatus.keyword object, so the table locks foreach thread .

The problem solved when I loaded the results to memory using all()

keyword_statuses =  DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1).all()

Upvotes: 1

Related Questions