ericman
ericman

Reputation: 163

How can I test on_failure in celery

My celery task has a base class were an on_failure method is implemented.

In my test, I patched one of the methods that the task is calling to, to raise an exception but on_faliure is never called.

Base class

class BaseTask(celery.Task):
    abstract = True 

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("error")

Task

@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
    logic.method(a, b)

Test

@patch('tasks.logic.method')
def test_something(self, mock):
    # arrange
    mock.side_effect = NotImplementedError

    # act
    with self.assertRaises(NotImplementedError):
        multiple(1, 2)

When running celery and an exception is raised everything works fine. CELERY_ALWAYS_EAGER is activated.

how can I make on_faliure run?

Upvotes: 5

Views: 3013

Answers (2)

ziuu
ziuu

Reputation: 386

You can simulate the exception in the run method + task_eager_propagates=False + result.get

task_eager_propagates=False - won't raise the exception immediately result.get - assures that task is finished (without that test may be flaky)

import celery
import logging


class BaseTask(celery.Task):
    abstract = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logging.error("on_failure called")


class Foo():
    def bar(*args):
        pass


@app.task(base=BaseTask)
def multiple(a, b):
    Foo().bar(a, b)


@override_settings(task_eager_propagates=False)
@mock.patch.object(Foo, 'bar', side_effect=NotImplementedError)
def test_something(mock_, caplog):

    result = multiple.delay(1, 2)

    with pytest.raises(NotImplementedError):
        result.get()

    assert "on_failure called" in  caplog.messages

Upvotes: 0

Paolo Cozzi
Paolo Cozzi

Reputation: 71

From a discussion on a issue in celery GitHub: on_failure test is "already done on the Celery level (verifying if on_failure is called)" and "write a test to test whatever your on_failure does instead". You could define a function inside the on_failure method and test it, or call on_failure like a classmethod:

import TestCase
from billiard.einfo import ExceptionInfo

class TestTask(TestCase):
    def test_on_failure(self):
        "Testing on failure method"

        exc = Exception("Test")
        task_id = "test_task_id"
        args = ["argument 1"]
        kwargs = {"key": "value"}
        einfo = ExceptionInfo

        # call on_failure method
        multiple.on_failure(exc, task_id, args, kwargs, einfo)

        # assert something appened

ExceptionInfo is the same type of object celery uses; multiple is your task as you defined it in your question.

Hope this helps

Upvotes: 3

Related Questions