kev
kev

Reputation: 9715

Celery tasks not throwing exception in Django Tests

I have a couple of celery tasks that are included in my Django tests. Unfortunately exceptions are not thrown when tasks are invoked via .delay(). I am setting CELERY_ALWAYS_EAGER to True.

tasks.py

import celeryapp as app

@app.task()
def exception_task():
    print 'CELERY_ALWAYS_EAGER:', app.conf['CELERY_ALWAYS_EAGER']
    raise Exception('foo')

tests.py

def test_exception_in_task(self):
        from tasks import exception_task
        exception_task.delay()

Output

CELERY_ALWAYS_EAGER: True
.
----------------------------------------------------------------------
Ran 1 test in 0.686s

When removing the .delay the test exits with an error as excpected:

ERROR: test_exception_in_task
Exception: foo

Versions

celery==3.1.4
Django==1.6.4

Upvotes: 13

Views: 2011

Answers (3)

kev
kev

Reputation: 9715

Seems I additionally had to set CELERY_EAGER_PROPAGATES_EXCEPTIONS to True.

Celery >=5.3.1 it is CELERY_TASK_EAGER_PROPAGATES.

Upvotes: 17

Wei
Wei

Reputation: 1282

@override_settings(task_always_eager=True, task_eager_propagates=True)
def test_my_task(self):
    pass

is what worked for me using Celery 5.

Documentation on config name changes: https://docs.celeryq.dev/en/stable/userguide/configuration.html

Upvotes: 0

Ross Patterson
Ross Patterson

Reputation: 5742

Under celery 4.0, I had to use CELERY_TASK_EAGER_PROPAGATES

Upvotes: 5

Related Questions