Roman Nozhenko
Roman Nozhenko

Reputation: 628

My first test for a celery task

I'm new and I can not figure out, I need your help. I have a part of the API code that saves the object and sends an SMS when it's done. How is it tested? I would be grateful for your help. Thank you!

It looks something like this:

tasks.py/

@shared_task
send_sms(addr, text):
    ...


views.py/

def mydef():
     my_objs_list = MyModel.values().filter(my_flag=False) # addr list
     for item in my_objs_list:
         send_sms.delay(str(item), text)
     my_objs_list.update(my_flag=True)

Upvotes: 2

Views: 362

Answers (1)

mariodev
mariodev

Reputation: 15494

You need to mock the delay method and assert against whatever arguments you're passing:

@override_settings(CELERY_ALWAYS_EAGER=True)
@mock.patch('project.tasks.send_sms.delay')
def test_view(self, send_sms):
    ...
    send_sms.assert_called_with('item-value-1', 'text-value-1')
    send_sms.assert_called_with('item-value-2', 'text-value-2')

Upvotes: 1

Related Questions