Reputation: 9715
I have a couple of celery tasks that are included in my Django tests. Unfortunately exceptions are not thrown when tasks are invoked via .delay(). I am setting CELERY_ALWAYS_EAGER to True.
tasks.py
import celeryapp as app
@app.task()
def exception_task():
print 'CELERY_ALWAYS_EAGER:', app.conf['CELERY_ALWAYS_EAGER']
raise Exception('foo')
tests.py
def test_exception_in_task(self):
from tasks import exception_task
exception_task.delay()
Output
CELERY_ALWAYS_EAGER: True
.
----------------------------------------------------------------------
Ran 1 test in 0.686s
When removing the .delay the test exits with an error as excpected:
ERROR: test_exception_in_task
Exception: foo
Versions
celery==3.1.4
Django==1.6.4
Upvotes: 13
Views: 2011
Reputation: 9715
Seems I additionally had to set CELERY_EAGER_PROPAGATES_EXCEPTIONS to True.
Celery >=5.3.1 it is CELERY_TASK_EAGER_PROPAGATES
.
Upvotes: 17
Reputation: 1282
@override_settings(task_always_eager=True, task_eager_propagates=True)
def test_my_task(self):
pass
is what worked for me using Celery 5.
Documentation on config name changes: https://docs.celeryq.dev/en/stable/userguide/configuration.html
Upvotes: 0
Reputation: 5742
Under celery 4.0, I had to use CELERY_TASK_EAGER_PROPAGATES
Upvotes: 5