Dr. Funkenstein
Dr. Funkenstein

Reputation: 466

Pytest, Flask, Celery - Celery NoneType Error

I have a Flask application using Celery, and the async processing works fine when the app is running locally. However, when I try to test (pytest) a route that uses Celery tasks, I get this error:

app/bp_dir/routes.py:12: in <module>
    from app import db, celery_tasks
app/celery_tasks.py:13: in <module>
    @celery.task()
E   AttributeError: 'NoneType' object has no attribute 'task'

It seems that Celery is not being spun up correctly when I'm testing. I think the underlying question is how to set up pytest fixtures so that routes importing Celery tasks can be tested.

app/celery_tasks.py looks like this:

from app import celeryapp

celery = celeryapp.celery

@celery.task()
def example_task():
    time.sleep(3)
    return 1

app/init.py

...
from app import celeryapp


def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(Config)

    ...

    # Celery
    celery = celeryapp.create_celery_app(app)
    celeryapp.celery = celery

    ...


    return app

app/celeryapp/init.py

from celery import Celery

CELERY_TASK_LIST = [
    'app.celery_tasks',
]

db_session = None
celery = None

def create_celery_app(_app=None):
    """
    Create a new Celery object and tie together the Celery config to the app's config.
    Wrap all tasks in the context of the Flask application.
    :param _app: Flask app
    :return: Celery app
    """

    from app import db

    celery = Celery(_app.import_name,
                    backend=_app.config['CELERY_BACKEND_URL'],
                    broker=_app.config['CELERY_BROKER_URL'],
                    include=CELERY_TASK_LIST)

    celery.conf.update(_app.config)
    always_eager = _app.config['TESTING'] or False
    celery.conf.update({'TASK_ALWAYS_EAGER': always_eager,
                        'CELERY_RESULT_BACKEND': 'redis'})

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            if not celery.conf.CELERY_ALWAYS_EAGER:
                with _app.app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
            else:
                # special pytest setup
                db.session = db_session
                return TaskBase.__call__(self, *args, **kwargs)

        def after_return(self, status, retval, task_id, args, kwargs, einfo):
            """
            After each Celery task, teardown our db session.
            FMI: https://gist.github.com/twolfson/a1b329e9353f9b575131
            Flask-SQLAlchemy uses create_scoped_session at startup which avoids any setup on a
            per-request basis. This means Celery can piggyback off of this initialization.
            """
            if _app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
                if not isinstance(retval, Exception):
                    db.session.commit()

            # If we aren't in an eager request (i.e. Flask will perform teardown), then teardown
            if not celery.conf.CELERY_ALWAYS_EAGER:
                db.session.remove()

    celery.Task = ContextTask


    return celery

app/celeryapp/celery_worker.py

from app import celeryapp, create_app

app = create_app()
celery = celeryapp.create_celery_app(app)
celeryapp.celery = celery

The code for the Celery set up is mostly borrowed from this [repo]https://github.com/kwiersma/flask-celery-sqlalchemy. The repo has tests [here]https://github.com/kwiersma/flask-celery-sqlalchemy/tree/master/tests, but I can't really see what I'm missing.

/testing_fixtures.py

import re
import flask
import pytest
from app import create_app, db

EMAIL = '[email protected]'
PASSWORD = 'Password123!'

def get_codes(data_structure):
    return [(str(x)) for x in range(len(data_structure))]

def extract_csrf_token(response: flask.Response) -> str:
    # is there better way to get CSRF token? I couldn't find one
    return re.search('name="csrf_token" type="hidden" value="([^"]*)"', str(response.data)).group(1)

@pytest.fixture
def flask_app():
    app = create_app()
    app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite://"
    app.config['TESTING'] = True
    app.config['SECRET_KEY'] = 'this is crucial for storing section'
    app.testing = True

    return app

@pytest.fixture
def app_context(flask_app):
    with flask_app.app_context():
        yield

@pytest.fixture
def setup_db(app_context):
    db.create_all()

@pytest.fixture
def test_client(setup_db, flask_app):
    with flask_app.test_client() as c:
        yield c

@pytest.fixture
def registered_user(test_client, flask_app):
    resp = test_client.get("/auth/register")  # to get csrf_token
    csrf_token = extract_csrf_token(resp)

    # follow_redirect=False is important - otherwise you won't be able to tell the difference between successful
    # and faulty registration
    resp = test_client.post('/auth/register',
                            data={'email': EMAIL, 'password': PASSWORD, 'password2': PASSWORD,
                                  'csrf_token': csrf_token, 'is_test':True},
                            follow_redirects=False)
    assert resp.status_code == 302, "/auth/register should redirect on successful registering"

@pytest.fixture
def logged_client(registered_user, test_client):
    resp = test_client.get("/auth/login")  # to get csrf_token
    csrf_token = extract_csrf_token(resp)

    resp = test_client.post('/auth/login', data={'email': EMAIL, 'password': PASSWORD, 'csrf_token': csrf_token})

    assert resp.status_code == 302, "/auth/login should redirect to another page on successful login"

    yield test_client

    resp = test_client.get('/auth/logout')
    assert resp.status_code == 302

Example test:

from testing_fixtures import *

def test_example_route(logged_client):
    response = logged_client.get('/example_route')
    assert response.status_code == 200

Example route:

from app import db, celery_tasks
...

@example_bp.route('/example_route')
def example_route():

    return 1

Upvotes: 0

Views: 1333

Answers (1)

Dave W. Smith
Dave W. Smith

Reputation: 24966

A bit of a tricky knot to untie, so I'll offer a slight restructuring.

First, note that you don't need an instance of Celery to declare tasks.

import celery

@celery.task
def mytask():
    ...

is sufficient.

Now, consider creating an instance of Celery in app/__init__.py, and defer initializing it until create_app() is invoke. Something like

celery = Celery(__name__)

def create_app(config_class=...):
    app = Flask(__name__)
    app.config.from_object(config_class)
    ...
    celery.config_from_object(config_class)

might let you do away with celeryapp entirely, avoiding most opportunities for import order fouling you up.

I have a working example you can crib from here, or look at what the Flask Mega Tutorial does when it introduces Rq, which an alternative to Celery.

Upvotes: 1

Related Questions