Reputation: 365
I'm trying to run a task using celery. I need to send post requests to a remote server while the user presses the send button, So I tried using celery with Redis here with this configuration in settings file:
BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'
according to documentation for apply_async I can define retry options like the code below:
__task_expiration = 60
__interval_start = 1 * 60
api_generator.apply_async(args=(*args),
group=user_key,
expires=__task_expiration,
retry=True,
retry_policy={
"max_retries": 3,
"interval_start": __interval_start
})
In documentation I found this definition for apply_async:
apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
and following the documentation, I can set this using retry and retry_policy
and a sample code for how to define retry options
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
I want my task to run 3 times to run in case of any failure, and the interval between each retry to 60 seconds. my task definition looks like this:
@shared_task
def api_generator(*args):
import requests
import json
url = os.environ.get("API_URL_CALL")
api_access_key = os.environ.get("API_ACCESS_KEY")
headers = {
"Authorization": api_access_key,
"Content-Type": "application/json"
}
json_schema = generate_json(*args)
response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)
if response.status_code != 200:
raise NameError("API Response error")
return response.status_code
but when my code fails, I don't see any retry mechanism in celery logs, what is the problem here? how can I define retry when calling my tasks using the apply_async method? I'm raising NameError("Exception")
for telling the worker that an error has occurred.
Upvotes: 2
Views: 6591
Reputation: 3450
[EDIT 1: Added acks_late
]
There are two things that can go wrong when you send a task to a Celery worker:
The first issue can be solved by defining retry
and retry_policy
as you did.
The second kind (which is what you want to solve), can be solved by calling self.retry()
upon a task failure.
Depending on your type of problem, it might be helpful to set CELERY_ACKS_LATE = True
.
Check out these links for further information:
Retry Lost or Failed Tasks (Celery, Django and RabbitMQ)
https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/
Upvotes: 3