Reputation: 288
Lets have a look at the next snippet -
@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
cursor = dbapi_con.cursor()
try:
cursor.execute("SELECT 1") # could also be dbapi_con.ping(),
# not sure what is better
except exc.OperationalError, ex:
if ex.args[0] in (2006, # MySQL server has gone away
2013, # Lost connection to MySQL server during query
2055): # Lost connection to MySQL server at '%s', system error: %d
# caught by pool, which will retry with a new connection
raise exc.DisconnectionError()
else:
raise
engine = create_engine('mysql://user:[email protected]/dbname', pool_recycle = 3600,pool_size=10, listeners=[check_connection])
session_factory = sessionmaker(bind = engine, autoflush=True, autocommit=False)
db_session = session_factory()
...
some code that may take several hours to run
...
db_session.execute('SELECT * FROM ' + P_TABLE + " WHERE id = '%s'" % id)
I thought that registering the checkout_connection function under the checkout event would solve it but it didnt now the question is how am i suppose to tell SQLAlchemy handle connection dropouts so every time i call execute() it will check if connection is available and if not it will initiate it once again?
----UPDATE----
The version of SQLAlchemy is 0.7.4
----UPDATE----
def checkout_listener(dbapi_con, con_record, con_proxy):
try:
try:
dbapi_con.ping(False)
except TypeError:
dbapi_con.ping()
except dbapi_con.OperationalError as exc:
if exc.args[0] in (2006, 2013, 2014, 2045, 2055):
raise DisconnectionError()
else:
raise
engine = create_engine(CONNECTION_URI, pool_recycle = 3600,pool_size=10)
event.listen(engine, 'checkout', checkout_listener)
session_factory = sessionmaker(bind = engine, autoflush=True, autocommit=False)
db_session = session_factory()
session_factory is sent to every newly created thread
class IncidentProcessor(threading.Thread):
def __init__(self, queue, session_factory):
if not isinstance(queue, Queue.Queue):
raise TypeError, "first argument should be of %s" (type(Queue.Queue))
self.queue = queue
self.db_session = scoped_session(session_factory)
threading.Thread.__init__(self)
def run(self):
self.db_session().execute('SELECT * FROM ...')
...
some code that takes alot of time
...
self.db_session().execute('SELECT * FROM ...')
now when execute runs after a big period of time i get the "MySQL server has gone away" error
Upvotes: 7
Views: 16361
Reputation: 100786
Try the pool_recycle
argument to create_engine
.
From the documentation:
Connection Timeouts
MySQL features an automatic connection close behavior, for connections that have been idle for eight hours or more. To circumvent having this issue, use the pool_recycle option which controls the maximum age of any connection:
engine = create_engine('mysql+mysqldb://...', pool_recycle=3600)
Upvotes: 3
Reputation: 6796
There was a talk about this, and this doc describes the problem pretty nicely, so I used their recommended approach to handle such errors: http://discorporate.us/jek/talks/SQLAlchemy-EuroPython2010.pdf
It looks something like this:
from sqlalchemy import create_engine, event
from sqlalchemy.exc import DisconnectionError
def checkout_listener(dbapi_con, con_record, con_proxy):
try:
try:
dbapi_con.ping(False)
except TypeError:
dbapi_con.ping()
except dbapi_con.OperationalError as exc:
if exc.args[0] in (2006, 2013, 2014, 2045, 2055):
raise DisconnectionError()
else:
raise
db_engine = create_engine(DATABASE_CONNECTION_INFO,
pool_size=100,
pool_recycle=3600)
event.listen(db_engine, 'checkout', checkout_listener)
Upvotes: 10
Reputation: 4386
You can try something like this:
while True:
try:
db_session.execute('SELECT * FROM ' + PONY_TABLE + " WHERE id = '%s'" % incident_id)
break
except SQLAlchemyError:
db_session.rollback()
If the connection has go away, this will raise an exception, the session will be rollback
d, it'll try again is likely to succeed.
Upvotes: -1