Jim
Jim

Reputation: 14270

Why are Celery task test result inconsistent?

I've written two simple integration tests for two Celery tasks but when I run them I get inconsistent results. I can run them one minute and one or both will pass and then run them a couple of seconds and one or both of them will fail. Why are these results inconsistent from one test run to the next? Also, are these tests actually testing that the Celery task is being sent to a queue and executed by a worker?

Thanks!

Here are the tasks:

# apps/photos/tasks.py
from __future__ import absolute_import
from conf.celeryapp import app

@app.task
def hello():
    return 'Hello world!'

@app.task
def add(x, y):
    return x + y

Here are the tests:

# apps/photos/tests/task_tests.py
from django.test import TestCase
from django.test.utils import override_settings
from apps.photos.tasks import hello, add

class TaskTestIT(TestCase):
    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')

    def test_hello(self):
        result = hello.delay()
        self.assertTrue(result.successful())
        self.assertEqual(str(result.result), 'Hello world!')

    def test_add(self):
        result = add.delay(1, 1)
        self.assertTrue(result.successful())
        self.assertEquals(result.get(), 2)

I run my tests with this command:

./manage.py test -s

I use django-nose as my test runner:

# conf/settings/base.py
USE_DJANGO_NOSE = True
if USE_DJANGO_NOSE:
    INSTALLED_APPS += ( 'django_nose', )
    TEST_RUNNER = 'django_nose.NoseTestSuiteRunner'

Here are my Celery app and configuration files:

# conf/celeryapp.py
from celery import Celery

app = Celery('celeryapp')
app.config_from_object('conf.celeryconfig')
app.autodiscover_tasks(['apps.photos'])

# conf/celeryconfig.py
from kombu import Queue, Exchange

BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('photos', Exchange('photos'), routing_key='photos'),
    Queue('mail', Exchange('mail'), routing_key='mail'),
)
CELERY_ROUTES = (
    {'apps.photos.tasks.hello': {'queue': 'default', 'routing_key': 'default'}},
    {'apps.photos.tasks.add': {'queue': 'photos', 'routing_key': 'photos'}},
    {'apps.photos.tasks.send_email': {'queue': 'mail', 'routing_key': 'mail'}},
)

Upvotes: 2

Views: 407

Answers (1)

Spc_555
Spc_555

Reputation: 4121

Task.delay() doesn't return the actual result of the task, it returns an AsyncResult object which will contain the result of the task when it will be executed. Your varying results are due to the fact that sometimes the task executes faster than your test starts checking its result and sometimes it takes longer than that. It depends on the load of your system, etc.

What you should do is to call result.get() first which waits for the task to finish executing, and only after that you should check whether it was .successful() etc.

For example, the following should yield consistent results:

def test_hello(self):
        result = hello.delay()
        result.get()
        self.assertTrue(result.successful())
        self.assertEqual(str(result.result), 'Hello world!')

Upvotes: 2

Related Questions