Roman
Roman

Reputation: 3941

MySQL query errors when connecting from Celery task running on Heroku

I'm seeing wrong query results when executing queries against an external MySQL database, but only when connecting from Celery tasks running on Heroku. The same tasks, when run on my own machine do not show these errors, and the errors only appear about half of the time (although when they fail, all tasks are wrong).

The tasks are managed by Celery via Redis, and the MySQL database does not itself run on Heroku. Both my local machine and Heroku connect to the same MySQL database.

I connect to the database using MySQL, with the pymysql driver, using;

DB_URI = 'mysql+pymysql://USER:PW@SERVER/DB'

engine = create_engine(stats_config.DB_URI, convert_unicode=True, echo_pool=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

The tasks are executed one by one.

Here is an example of a task with different results:

@shared_task(bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):

    db_session.close()
    start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
    end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

    gross_rev_trans_VK = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
    gross_rev_trans_Stripe = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
    gross_rev_trans = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

    if gross_rev_trans_VK is None:
        gross_rev_trans_VK = 0

    if gross_rev_trans_Stripe is None:
        gross_rev_trans_Stripe = 0

    if gross_rev_trans is None:
        gross_rev_trans = 0

    print ('gross', gross_rev_trans_VK, gross_rev_trans_Stripe, gross_rev_trans)

    total_gross_rev = gross_rev_trans_VK + gross_rev_trans_Stripe + gross_rev_trans

    return {'total_rev' : str(total_gross_rev / 100), 'current': 100, 'total': 100, 'statistic': 'get_gross_revenue', 'time_benchmark': (datetime.today() - START_TIME_FORM).total_seconds()}

# Selects gross revenue between selected dates
@app.route('/get-gross-revenue', methods=["POST"])
@basic_auth.required
@check_verified
def get_gross_revenue():
    if request.method == "POST":
        task = get_gross_revenue_task.apply_async([session['g_start_date'], session['g_end_date'], session['START_TIME_FORM']])
        return json.dumps({}), 202, {'Location': url_for('taskstatus_get_gross_revenue', task_id=task.id)}

These are simple and fast tasks, completing within a few seconds.

The tasks fail by producing small differences. For example, for a task where the correct result would by 30111, when things break the task would produce 29811 instead. It is always the code that uses `db

What I tried:

Upvotes: 2

Views: 1962

Answers (1)

Martijn Pieters
Martijn Pieters

Reputation: 1122412

You are re-using sessions between tasks in different workers. Create your session per Celery worker, or even per task.

Know that tasks are actually persisted per worker. You can use this to cache a session for each task, so you don't have to recreate the session each time the task is run. This is easiest done with a custom task class; the documentation uses database connection caching as an example there.

To do this with a SQLAlchemy session, use:

Session = scoped_session(sessionmaker(autocommit=True, autoflush=True))

class SQLASessionTask(Task):
    _session = None

    @property
    def session(self):
        if self._session is None:
            engine = create_engine(
                stats_config.DB_URI, convert_unicode=True, echo_pool=True) 
            self._session = Session(bind=engine)
        return self._session

Use this as:

@shared_task(base=SQLASessionTask, bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):
    db_session = self.session
    # ... etc.

This only creates a SQLAlchemy session for the current task only if it needs one, the moment you access self.session.

Upvotes: 4

Related Questions