Imran S.
Imran S.

Reputation: 935

How to retry a celery task without duplicating it - SQS

I have a Celery task that takes a message from an SQS queue and tries to run it. If it fails it is supposed to retry every 10 seconds at least 144 times. What I think is happening is that it fails and gets back into the queue, and at the same time it creates a new one, duplicating it to 2. These 2 fail again and follow the same pattern to create 2 new and becoming 4 messages in total. So if I let it run for some time the queue gets clogged.

What I am not getting is the proper way to retry it without duplicating. Following is the code that retries. Please see if someone can guide me here.

from celery import shared_task
from celery.exceptions import MaxRetriesExceededError


@shared_task
def send_br_update(bgc_id, xref_id, user_id, event):
    from myapp.models.mappings import BGC

    try:
        bgc = BGC.objects.get(pk=bgc_id)
        return bgc.send_br_update(user_id, event)

    except BGC.DoesNotExist:
        pass

    except MaxRetriesExceededError:
        pass

    except Exception as exc:
        # retry every 10 minutes for at least 24 hours
        raise send_br_update.retry(exc=exc, countdown=600, max_retries=144)

Update: More explanation of the issue...

A user creates an object in my database. Other users act upon that object and as they change the state of that object, my code emits signals. The signal handler then initiates a celery task, which means that it connects to the desired SQS queue and submits the message to the queue. The celery server, running the workers, see that new message and try to execute the task. This is where it fails and the retry logic comes in.

According to celery documentation to retry a task all we need to do is to raise self.retry() call with countdown and/or max_retries. If a celery task raises an exception it is considered as failed. I am not sure how SQS handles this. All I know is that one task fails and there are two in the queue, both of these fail and then there are 4 in the queue and so on...

Upvotes: 4

Views: 4253

Answers (3)

m0y
m0y

Reputation: 138

I have recently encountered something similar and none of the other answers really helped me.

I have found there is a Caveats section in Celery's documentation related to SQS:

If a task isn’t acknowledged within the visibility_timeout, the task will be redelivered to another worker and executed.

This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.

In short, you have to make sure the visibility_timeout in SQS is higher than the countdown value you use in Celery to prevent the task duplication from happening.

Upvotes: 1

gCoh
gCoh

Reputation: 3089

This is probably a little bit late, I have written a backoff policy for Celery + SQS as a patch.

You can see how it is implemented in this repository

https://github.com/galCohen88/celery_sqs_retry_policy/blob/master/svc/celery.py

Upvotes: 1

mootmoot
mootmoot

Reputation: 13176

This is NOT celery nor SQS issues. The real issues is the workflow , i.e. way of you sending message to MQ service and handle it that cause duplication. You will face the same problem using any other MQ service.

Imagine your flow

  1. script : read task message. MQ Message : lock for 30 seconds
  2. script : task fail. MQ Message : locking timeout, message are now free to be grab again
  3. script : create another task message
  4. Script : Repeat Step 1. MQ Message : 2 message with the same task, so step 1 will launch 2 task.

So if the task keep failing, it will keep multiply, 2,4,8,16,32....

If celery script are mean to "Recreate failed task and send to message queue", you want to make sure these message can only be read ONCE. **You MUST discard the task message after it already been read 1 time, even if the task failed. **

There are at least 2 ways to do this, choose one.

  1. Delete the message before recreate the task. OR
  2. In SQS, you can enforce this by create DeadLetter Queue, configure the Redrive Policy, set Maximum Receives to 1. This will make sure the message with the task that have been read never recycle.

You may prefer method 2, because method 1 require you to configure celery to "consume"(read and delete) ASAP it read the message, which is not very practical. (and you must make sure you delete it before create a new message for failed task) This dead letter queue is a way to let you to check if celery CRASH, i.e. message that have been read once but not consumed (delete) means program stop somewhere.

Upvotes: 1

Related Questions