Reputation: 14270
I've written two simple integration tests for two Celery tasks but when I run them I get inconsistent results. I can run them one minute and one or both will pass and then run them a couple of seconds and one or both of them will fail. Why are these results inconsistent from one test run to the next? Also, are these tests actually testing that the Celery task is being sent to a queue and executed by a worker?
Thanks!
Here are the tasks:
# apps/photos/tasks.py
from __future__ import absolute_import
from conf.celeryapp import app
@app.task
def hello():
return 'Hello world!'
@app.task
def add(x, y):
return x + y
Here are the tests:
# apps/photos/tests/task_tests.py
from django.test import TestCase
from django.test.utils import override_settings
from apps.photos.tasks import hello, add
class TaskTestIT(TestCase):
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True,
BROKER_BACKEND='memory')
def test_hello(self):
result = hello.delay()
self.assertTrue(result.successful())
self.assertEqual(str(result.result), 'Hello world!')
def test_add(self):
result = add.delay(1, 1)
self.assertTrue(result.successful())
self.assertEquals(result.get(), 2)
I run my tests with this command:
./manage.py test -s
I use django-nose as my test runner:
# conf/settings/base.py
USE_DJANGO_NOSE = True
if USE_DJANGO_NOSE:
INSTALLED_APPS += ( 'django_nose', )
TEST_RUNNER = 'django_nose.NoseTestSuiteRunner'
Here are my Celery app and configuration files:
# conf/celeryapp.py
from celery import Celery
app = Celery('celeryapp')
app.config_from_object('conf.celeryconfig')
app.autodiscover_tasks(['apps.photos'])
# conf/celeryconfig.py
from kombu import Queue, Exchange
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('photos', Exchange('photos'), routing_key='photos'),
Queue('mail', Exchange('mail'), routing_key='mail'),
)
CELERY_ROUTES = (
{'apps.photos.tasks.hello': {'queue': 'default', 'routing_key': 'default'}},
{'apps.photos.tasks.add': {'queue': 'photos', 'routing_key': 'photos'}},
{'apps.photos.tasks.send_email': {'queue': 'mail', 'routing_key': 'mail'}},
)
Upvotes: 2
Views: 407
Reputation: 4121
Task.delay()
doesn't return the actual result of the task, it returns an AsyncResult
object which will contain the result of the task when it will be executed. Your varying results are due to the fact that sometimes the task executes faster than your test starts checking its result and sometimes it takes longer than that. It depends on the load of your system, etc.
What you should do is to call result.get()
first which waits for the task to finish executing, and only after that you should check whether it was .successful()
etc.
For example, the following should yield consistent results:
def test_hello(self):
result = hello.delay()
result.get()
self.assertTrue(result.successful())
self.assertEqual(str(result.result), 'Hello world!')
Upvotes: 2