AmirReza
AmirReza

Reputation: 365

How to set retry tasks in case of failure in Django-Celery

I'm trying to run a task using celery. I need to send post requests to a remote server while the user presses the send button, So I tried using celery with Redis here with this configuration in settings file:

BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'

according to documentation for apply_async I can define retry options like the code below:

__task_expiration = 60
__interval_start = 1 * 60

api_generator.apply_async(args=(*args),
                                group=user_key,
                                expires=__task_expiration,
                                retry=True,
                                retry_policy={
                                  "max_retries": 3,
                                  "interval_start": __interval_start
                                })

In documentation I found this definition for apply_async:

apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)

and following the documentation, I can set this using retry and retry_policy

enter image description here

and a sample code for how to define retry options

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

I want my task to run 3 times to run in case of any failure, and the interval between each retry to 60 seconds. my task definition looks like this:

@shared_task
def api_generator(*args):
    import requests
    import json
    url = os.environ.get("API_URL_CALL")
    api_access_key = os.environ.get("API_ACCESS_KEY")

    headers = {
        "Authorization": api_access_key,
        "Content-Type": "application/json"
    }

    json_schema = generate_json(*args)

    response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)

    if response.status_code != 200:
        raise NameError("API Response error")

    return response.status_code

but when my code fails, I don't see any retry mechanism in celery logs, what is the problem here? how can I define retry when calling my tasks using the apply_async method? I'm raising NameError("Exception") for telling the worker that an error has occurred.

Upvotes: 2

Views: 6591

Answers (1)

Lior Pollak
Lior Pollak

Reputation: 3450

[EDIT 1: Added acks_late]

There are two things that can go wrong when you send a task to a Celery worker:

  1. Connection issues with the broker and Message Queue.
  2. Exceptions raised on the worker.

The first issue can be solved by defining retry and retry_policy as you did.

The second kind (which is what you want to solve), can be solved by calling self.retry() upon a task failure.

Depending on your type of problem, it might be helpful to set CELERY_ACKS_LATE = True.

Check out these links for further information:

Retry Lost or Failed Tasks (Celery, Django and RabbitMQ)

https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/

Upvotes: 3

Related Questions