Bato-Bair Tsyrenov
Bato-Bair Tsyrenov

Reputation: 1194

Celery using default broker instead of reddis. Flask + Celery + Factory pattern

The closest working answer is that: How to use Flask-SQLAlchemy in a Celery task

I aim this question at someone who is actually using python, flask, factory pattern and celery. Python is 2.7, others are latest version today.

I am trying to avoid circular dependencies and do it flasky way,

I have gone through 10 pages of google and all possible solutions and I could not solve this.

 ~/git/project celery -A app  worker --loglevel=info   

Celery is still connecting to:

[2017-11-10 16:08:12,208: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.

Trying again in 32.00 seconds...

Despite various attempts to start the app

app/extensions.py

from flask.ext.marshmallow import Marshmallow
from flask.ext.sqlalchemy import SQLAlchemy
from flask_mail import Mail
import flask
from celery import Celery


class FlaskCelery(Celery):
    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)
        print self._conf['broker_url']


celery = FlaskCelery()
db = SQLAlchemy()
ma = Marshmallow()
mail = Mail()

!!!!! print self._conf['broker_url']: redis://localhost:6379/0

app/init.py

from flask import Flask, render_template

from app.extensions import db, ma, mail, celery
from celerytasks import save_mailbox_items, sumf
from config import config
from utils import encoding_utils


def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])

    # SQLAlchemy configuration
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://...'

    # Celery configuration
    app.config['BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['broker_url'] = 'redis://localhost:6379/0'
    app.config['celery_broker_url'] = 'redis://localhost:6379/0'
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

    register_extensions(app)

    return app


def register_extensions(app):
    db.init_app(app)
    with app.app_context():
        db.create_all()
    ma.init_app(app)
    mail.init_app(app)
    celery.init_app(app)

    from .api_v1 import api as api_v1_blueprint
    app.register_blueprint(api_v1_blueprint, url_prefix='/api/v1')

    @app.route('/', methods=['GET'])
    def index():
        return render_template('index.html')

./manager.py

import os

from flask.ext.script import Manager

from app import create_app

app = create_app(os.getenv('APP_CONFIG', 'default'))
manager = Manager(app)


@manager.shell
def make_shell_context():
    return dict(app=app)


if __name__ == '__main__':
    manager.run()

Upvotes: 1

Views: 674

Answers (1)

AugBar
AugBar

Reputation: 457

When you run your celery worker, it will use the one created with

celery = FlaskCelery()

But because it does not receive a Flask app as an argument, you never go through self.init_app(kwargs['app']) and thus it will use the default configuration.

Several options are possible to fix this here:

  • instantiate a FlaskCelery object and passing a Flask instance when doing so

  • in your FlaskCelery class, instantiate a flask app in your init function if no argument is passed in the constructor.

For the latest point, this would give something like

class FlaskCelery(Celery): 
    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])
         else:
            self.init_app(create_app(os.getenv('APP_CONFIG', 'default')))

     def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)
        print self._conf['broker_url']

Upvotes: 0

Related Questions