Jonathan
Jonathan

Reputation: 51

Pyramid / SQLAlchemy model binding in Celery tasks

I am trying to set up Celery tasks. Our main app is Pyramid with SQLAlchemy.

So I have a task defined as:

from celery.contrib.methods import task
from apipython.celerytasks import celery

class Email():
    def __init__(self, from_name, from_email, to_name, to_email, subject, html_body,
                 sendgrid_category=None):
        self.from_name = from_name
        self.from_email = from_email
        self.to_name = to_name
        self.to_email = to_email
        self.subject = subject
        self.body = None
        self.html_body = html_body
        self.sendgrid_category = sendgrid_category

class EmailService():
    @task()
    def task__send_smtp(self, email, from_user_id=None, to_user_id=None):
        # send the email, not shown here

        # EmailLog is a SQLAlchemy model
        email_log = EmailLog(
                        email.subject,
                        email.html_body,
                        from_user_id=from_user_id,
                        to_user_id=to_user_id,
                        action_type=email.sendgrid_category)
        DBSession.add(email_log)

        transaction.commit()

And celerytasks.py I have:

from celery import Celery

celery = Celery('apipython.celery',
                broker='sqla+mysql+mysqldb://root:[email protected]/gs?charset=utf8',
                backend=None,
                include=['apipython.services.NotificationService'])

if __name__ == '__main__':
    celery.start()

It works - the task gets serialized and picked up.

However when I try to use SQLAlchemy / DBSession inside the task, I get an error:

UnboundExecutionError: Could not locate a bind configured on mapper Mapper|EmailLog|emaillogs or this Session

I understand the worker task is running on a separate process and need to have its settings, session, engine etc set up. So I have this:

@worker_init.connect
def bootstrap_pyramid(signal, sender):
    import os
    from pyramid.paster import bootstrap
    sender.app.settings = bootstrap('development.ini')['registry'].settings

    customize_settings(sender.app.settings)

    engine = sqlalchemy.create_engine('mysql+mysqldb://root:[email protected]/gs?charset=utf8')
    DBSession.configure(bind=engine)
    Base.metadata.bind = engine

However I am still getting the same error.

DBSession and Base are defined in models.py as

DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Base = declarative_base()

What step am I missing to make the models binding work?

Second question, can this code for creating session / binding work in celery's init, vs worker init?

(BTW I did try pyramid_celery but prefer to make plain celery work)

Thanks,

Upvotes: 2

Views: 1378

Answers (1)

Jonathan
Jonathan

Reputation: 51

My colleague tried the exact same code and it worked. Strange

Upvotes: 1

Related Questions