Reputation: 163
My celery task has a base class were an on_failure
method is implemented.
In my test, I patched one of the methods that the task is calling to, to raise an exception but on_faliure
is never called.
Base class
class BaseTask(celery.Task):
abstract = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("error")
Task
@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
logic.method(a, b)
Test
@patch('tasks.logic.method')
def test_something(self, mock):
# arrange
mock.side_effect = NotImplementedError
# act
with self.assertRaises(NotImplementedError):
multiple(1, 2)
When running celery and an exception is raised everything works fine.
CELERY_ALWAYS_EAGER
is activated.
how can I make on_faliure
run?
Upvotes: 5
Views: 3013
Reputation: 386
You can simulate the exception in the run
method + task_eager_propagates=False
+ result.get
task_eager_propagates=False
- won't raise the exception immediately
result.get
- assures that task is finished (without that test may be flaky)
import celery
import logging
class BaseTask(celery.Task):
abstract = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
logging.error("on_failure called")
class Foo():
def bar(*args):
pass
@app.task(base=BaseTask)
def multiple(a, b):
Foo().bar(a, b)
@override_settings(task_eager_propagates=False)
@mock.patch.object(Foo, 'bar', side_effect=NotImplementedError)
def test_something(mock_, caplog):
result = multiple.delay(1, 2)
with pytest.raises(NotImplementedError):
result.get()
assert "on_failure called" in caplog.messages
Upvotes: 0
Reputation: 71
From a discussion on a issue in celery GitHub: on_failure
test is "already done on the Celery level (verifying if on_failure is called)" and "write a test to test whatever your on_failure does instead". You could define a function inside the on_failure
method and test it, or call on_failure
like a classmethod:
import TestCase
from billiard.einfo import ExceptionInfo
class TestTask(TestCase):
def test_on_failure(self):
"Testing on failure method"
exc = Exception("Test")
task_id = "test_task_id"
args = ["argument 1"]
kwargs = {"key": "value"}
einfo = ExceptionInfo
# call on_failure method
multiple.on_failure(exc, task_id, args, kwargs, einfo)
# assert something appened
ExceptionInfo
is the same type of object celery
uses; multiple
is your task as you defined it in your question.
Hope this helps
Upvotes: 3