Reputation: 3928
I'm trying to process an entire csv file as fast as possible, so I'm looking to process each line in parallel as a celery task. The cleanup, which is also a celery task, has to wait until every line is processed. See the example below.
The problem is, I can't seem to get through a file because I keep running into Connection errors with MySQL. So far, I've seen these two errors: 2013, 'Lost connection to MySQL server during query'
and 2006, 'MySQL server has gone away'
from app.db.meta import Session
from celery import chord, Celery
from celery.signals import task_postrun
celery = Celery()
celery.config_from_object('config')
@task_postrun.connect
def close_session(*args, **kwargs):
Session.remove()
def main():
# process each line in parallel
header = [process_line.s(line) for line in csv_file]
# pass stats to cleanup after all lines are processed
callback = cleanup.s()
chord(header)(callback)
@celery.task
def process_line(line):
session = Session()
...
# process line
...
return stats
@celery.task
def cleanup(stats):
session = Session()
...
# do cleanup and log stats
...
I'm using celery 3.1.18 and SQLAlchemy 0.9.9. I'm using connection pooling as well.
mysql> SHOW FULL PROCESSLIST;
+----+------+-----------+-----------------+---------+------+-------+-----------------------+
| Id | User | Host | db | Command | Time | State | Info |
+----+------+-----------+-----------------+---------+------+-------+-----------------------+
| 1 | root | localhost | ab__development | Sleep | 4987 | | NULL |
| 11 | root | localhost | ab__development | Sleep | 1936 | | NULL |
| 16 | root | localhost | ab__development | Sleep | 143 | | NULL |
| 17 | root | localhost | ab__development | Sleep | 1045 | | NULL |
| 18 | root | localhost | NULL | Query | 0 | init | SHOW FULL PROCESSLIST |
| 21 | root | localhost | ab__development | Sleep | 7 | | NULL |
+----+------+-----------+-----------------+---------+------+-------+-----------------------+
6 rows in set (0.01 sec)
Upvotes: 0
Views: 3402
Reputation: 474
Read the answer. In short you have to either disable the SQLAlchemy's Pool engine or try to ping the mysql server:
from flask.ext.sqlalchemy import SQLAlchemy
from sqlalchemy import event, exc
def instance(app):
""":rtype: SQLAlchemy"""
db = SQLAlchemy(app)
if app.testing:
return db
@event.listens_for(db.engine, 'checkout')
def checkout(dbapi_con, con_record, con_proxy):
try:
try:
dbapi_con.ping(False)
except TypeError:
app.logger.debug('MySQL connection died. Restoring...')
dbapi_con.ping()
except dbapi_con.OperationalError as e:
app.logger.warning(e)
if e.args[0] in (2006, 2013, 2014, 2045, 2055):
raise exc.DisconnectionError()
else:
raise
return db
Upvotes: 1