Shamsiddin Parpiev
Shamsiddin Parpiev

Reputation: 99

Celery retrieve exception type or info from previous retries

My purpose is to set different max_retries for each exception type

Example code

@app.task(bind=True, serializer='json')
def create_user(self, client):
    try:
        data = client.get('users/')
    except requests.ConnectionError as e:
        raise self.retry(exc=e, max_retries=10) from e
    except requests.ReadTimeout as e:
        raise self.retry(exc=e, max_retries=5) from e

The goal is to stop retrying task only if 5 ReadTimeout errors occured or 10 ConnectionError occured. The current code unifies the retries: if 6 ConnectionErrors occured, and then ReadTimeout is raised the celery does not retry the task. Is there a way to set retries for each error type

Upvotes: 0

Views: 30

Answers (1)

Lemon Reddy
Lemon Reddy

Reputation: 637

It's not possible to set request.retries anything other than integer. This attribute is what used internally to check with the max_retries. If you really need to do something like this, you can send retries inside the task_kwargs and then compare it with max_retries inside a custom BaseTask implementation as follows,

import requests
from celery import Task

class CustomFailureTask(Task):
    connection_max_retries = 5
    timeout_max_retries = 10
    default_retry_delay = 0
    max_retries = 10

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(f"{self.timeout_max_retries} timeout errors")
        if isinstance(exc, requests.ConnectionError):
            if kwargs.get("connection_retries",
                          0) < self.connection_max_retries:
                kwargs["connection_retries"] = kwargs.get(
                    "connection_retries", 0) + 1
        elif isinstance(exc, requests.ReadTimeout):
            if kwargs.get("timeout_retries", 0) < self.timeout_max_retries:
                kwargs["timeout_retries"] = kwargs.get("timeout_retries",
                                                       0) + 1
                self.retry(exc=exc, kwargs=kwargs, throw=False)

@app.task(bind=True, serializer='json', base=CustomFailureTask)
def create_user(self, client):
    data = client.get('users/')

Upvotes: 0

Related Questions