Reputation: 51
I am trying to set up Celery tasks. Our main app is Pyramid with SQLAlchemy.
So I have a task defined as:
from celery.contrib.methods import task
from apipython.celerytasks import celery
class Email():
def __init__(self, from_name, from_email, to_name, to_email, subject, html_body,
sendgrid_category=None):
self.from_name = from_name
self.from_email = from_email
self.to_name = to_name
self.to_email = to_email
self.subject = subject
self.body = None
self.html_body = html_body
self.sendgrid_category = sendgrid_category
class EmailService():
@task()
def task__send_smtp(self, email, from_user_id=None, to_user_id=None):
# send the email, not shown here
# EmailLog is a SQLAlchemy model
email_log = EmailLog(
email.subject,
email.html_body,
from_user_id=from_user_id,
to_user_id=to_user_id,
action_type=email.sendgrid_category)
DBSession.add(email_log)
transaction.commit()
And celerytasks.py I have:
from celery import Celery
celery = Celery('apipython.celery',
broker='sqla+mysql+mysqldb://root:[email protected]/gs?charset=utf8',
backend=None,
include=['apipython.services.NotificationService'])
if __name__ == '__main__':
celery.start()
It works - the task gets serialized and picked up.
However when I try to use SQLAlchemy / DBSession inside the task, I get an error:
UnboundExecutionError: Could not locate a bind configured on mapper Mapper|EmailLog|emaillogs or this Session
I understand the worker task is running on a separate process and need to have its settings, session, engine etc set up. So I have this:
@worker_init.connect
def bootstrap_pyramid(signal, sender):
import os
from pyramid.paster import bootstrap
sender.app.settings = bootstrap('development.ini')['registry'].settings
customize_settings(sender.app.settings)
engine = sqlalchemy.create_engine('mysql+mysqldb://root:[email protected]/gs?charset=utf8')
DBSession.configure(bind=engine)
Base.metadata.bind = engine
However I am still getting the same error.
DBSession and Base are defined in models.py as
DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Base = declarative_base()
What step am I missing to make the models binding work?
Second question, can this code for creating session / binding work in celery's init, vs worker init?
(BTW I did try pyramid_celery but prefer to make plain celery work)
Thanks,
Upvotes: 2
Views: 1378