Reputation: 190
I am running in to the dreaded MySQL Commands out of Sync when using a custom DB library and celery.
The library is as follows:
import pymysql
import pymysql.cursors
from furl import furl
from flask import current_app
class LegacyDB:
"""Db
Legacy Database connectivity library
"""
def __init__(self,app):
with app.app_context():
self.rc = current_app.config['RAVEN']
self.logger = current_app.logger
self.data = {}
# setup Mysql
try:
uri = furl(current_app.config['DBCX'])
self.dbcx = pymysql.connect(
host=uri.host,
user=uri.username,
passwd=uri.password,
db=str(uri.path.segments[0]),
port=int(uri.port),
cursorclass=pymysql.cursors.DictCursor
)
except:
self.rc.captureException()
def query(self, sql, params = None, TTL=36):
# INPUT 1 : SQL query
# INPUT 2 : Parameters
# INPUT 3 : Time To Live
# OUTPUT : Array of result
# check that we're still connected to the
# database before we fire off the query
try:
db_cursor = self.dbcx.cursor()
if params:
self.logger.debug("%s : %s" % (sql, params))
db_cursor.execute(sql,params)
self.dbcx.commit()
else:
self.logger.debug("%s" % sql)
db_cursor.execute(sql)
self.data = db_cursor.fetchall()
if self.data == None:
self.data = {}
db_cursor.close()
except Exception as ex:
if ex[0] == "2006":
db_cursor.close()
self.connect()
db_cursor = self.dbcx.cursor()
if params:
db_cursor.execute(sql,params)
self.dbcx.commit()
else:
db_cursor.execute(sql)
self.data = db_cursor.fetchall()
db_cursor.close()
else:
self.rc.captureException()
return self.data
The purpose of the library is to work alongside SQLAlchemy whilst I migrate a legacy database schema from a C++-based system to a Python based system.
All configuration is done via a Flask application and the app.config['DBCX'] value reads the same as a SQLAlchemy String ("mysql://user:pass@host:port/dbname") allowing me to easily switch over in future.
I have a number of tasks that run "INSERT" statements via celery, all of which utilise this library. As you can imagine, the main reason for running Celery is so that I can increase throughput on this application, however I seem to be hitting an issue with the threading in my library or the application as after a while (around 500 processed messages) I see the following in the logs:
Stacktrace (most recent call last):
File "legacy/legacydb.py", line 49, in query
self.dbcx.commit()
File "pymysql/connections.py", line 662, in commit
self._read_ok_packet()
File "pymysql/connections.py", line 643, in _read_ok_packet
raise OperationalError(2014, "Command Out of Sync")
I'm obviously doing something wrong to hit this error, however it doesn't seem to matter whether MySQL has autocommit enabled/disabled or where I place my connection.commit() call.
If I leave out the connection.commit() then I don't get anything inserted into the database.
I've recently moved from mysqldb to pymysql and the occurrences appear to be lower, however given that these are simple "insert" commands and not a complicated select (there aren't even any foreign key constraints on this database!) I'm struggling to work out where the issue is.
As things stand at present, I am unable to use executemany as I cannot prepare the statements in advance (I am pulling data from a "firehose" message queue and storing it locally for later processing).
Upvotes: 2
Views: 1890
Reputation: 34175
Is the init called once, or per-worker? If only once, you need to move the initialisation.
How about lazily initialising the connection in a thread-local variable the first time query is called?
Upvotes: 1
Reputation: 36036
First of all, make sure that the celery
thingamajig uses its own connection(s) since
>>> pymysql.threadsafety
1
Which means: "threads may share the module but not connections".
Upvotes: 3