Reputation: 1194
The closest working answer is that: How to use Flask-SQLAlchemy in a Celery task
I aim this question at someone who is actually using python, flask, factory pattern and celery. Python is 2.7, others are latest version today.
I am trying to avoid circular dependencies and do it flasky way,
I have gone through 10 pages of google and all possible solutions and I could not solve this.
~/git/project celery -A app worker --loglevel=info
Celery is still connecting to:
[2017-11-10 16:08:12,208: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
Trying again in 32.00 seconds...
Despite various attempts to start the app
app/extensions.py
from flask.ext.marshmallow import Marshmallow
from flask.ext.sqlalchemy import SQLAlchemy
from flask_mail import Mail
import flask
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
print self._conf['broker_url']
celery = FlaskCelery()
db = SQLAlchemy()
ma = Marshmallow()
mail = Mail()
!!!!! print self._conf['broker_url']: redis://localhost:6379/0
app/init.py
from flask import Flask, render_template
from app.extensions import db, ma, mail, celery
from celerytasks import save_mailbox_items, sumf
from config import config
from utils import encoding_utils
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
# SQLAlchemy configuration
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://...'
# Celery configuration
app.config['BROKER_URL'] = 'redis://localhost:6379/0'
app.config['broker_url'] = 'redis://localhost:6379/0'
app.config['celery_broker_url'] = 'redis://localhost:6379/0'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
register_extensions(app)
return app
def register_extensions(app):
db.init_app(app)
with app.app_context():
db.create_all()
ma.init_app(app)
mail.init_app(app)
celery.init_app(app)
from .api_v1 import api as api_v1_blueprint
app.register_blueprint(api_v1_blueprint, url_prefix='/api/v1')
@app.route('/', methods=['GET'])
def index():
return render_template('index.html')
./manager.py
import os
from flask.ext.script import Manager
from app import create_app
app = create_app(os.getenv('APP_CONFIG', 'default'))
manager = Manager(app)
@manager.shell
def make_shell_context():
return dict(app=app)
if __name__ == '__main__':
manager.run()
Upvotes: 1
Views: 674
Reputation: 457
When you run your celery worker, it will use the one created with
celery = FlaskCelery()
But because it does not receive a Flask app as an argument, you never go through self.init_app(kwargs['app']) and thus it will use the default configuration.
Several options are possible to fix this here:
instantiate a FlaskCelery object and passing a Flask instance when doing so
in your FlaskCelery class, instantiate a flask app in your init function if no argument is passed in the constructor.
For the latest point, this would give something like
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
else:
self.init_app(create_app(os.getenv('APP_CONFIG', 'default')))
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
print self._conf['broker_url']
Upvotes: 0