Reputation: 1962
I create one celery task which has dependency on other module. I delayed celery task at model save method and i want to pass all test cases without celery. Below is my code.
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
super(Offers, self).save(force_insert=False, force_update=False, using=None,
update_fields=None)
change_offer_status.apply_async(args=[self.id], eta=self.valid_to, queue='credr_core_task',
routing_key='credr_core_task')
Test.py
class OfferTests(APITestCase):
authenticated_client = APIClient()
def setUp(self):
data = {
"username": "vivek",
"first_name": "Vivek",
"last_name": "Dogra",
"email": "[email protected]",
"contact_number": "9834982602",
"password": "easy"
}
self.user = User.objects.create(data)
token = Token.objects.get(user__username='vivek')
self.authenticated_client.credentials(HTTP_AUTHORIZATION='Token ' + token.key)
mock_task = Mock()
mock_task.get = Mock(return_value={'success': True})
print mock_task.get() # outputs {'success': True}
with patch('offers.tasks.change_offer_status.apply_async', new=mock_task) as mocked_task:
mocked_task.return_value = True
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True, )
def test_add_offer(self):
"""
add user address
"""
start_date = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
end_date = datetime.strftime(datetime.now() + timedelta(days=3), "%Y-%m-%d %H:%M:%S")
data = {
"type": OfferTypeEnum.FLAT._value_,
"name": "CredR Offer",
"code": "FLAT100",
"status": OfferStatusEnum.ACTIVE._value_,
"value": "1004",
"discount_value": 1004,
"valid_from": start_date,
"valid_to": end_date,
"message": "GET FLAT 100"
}
response = self.authenticated_client.post(URL, data, format='json')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
I have created one mock object in setUp method. But i get connection refused error of rabbitMq.
Upvotes: 2
Views: 6061
Reputation: 2273
From your words "...i want to pass all test cases without celery." and "...But i get connection refused error of rabbitMq", I guess you are trying to run the tests without triggering REAL celery tasks.
If that's true, you do not want to mock apply_async
. The real requirement is to prevent celery sending messages to queue to trigger the task. So you may want to mock send_task
of you celery app.
Says you have celeryapp
instance in offer
module. You may try to use
patch('offers.celeryapp.send_task', new=mock_task)
to replace
patch('offers.tasks.change_offer_status.apply_async', new=mock_task)
.
Then your celery app should not send message.
And if you are using Celery 4+ (I forgot detail version), CELERY_ALWAYS_EAGER
need to be changed to CELERY_TASK_ALWAYS_EAGER
Upvotes: 3
Reputation: 2099
i want to pass all test cases without celery
Set CELERY_ALWAYS_EAGER = True
when running your tests.
[...] tasks will be executed locally instead of being sent to the queue.
It may pay to read through the other configuration options here too: http://docs.celeryproject.org/en/latest/configuration.html#celery-always-eager
Upvotes: 3