Azima
Azima

Reputation: 4151

Celery: How to add a delay to a message sending to SQS

I'm using Celery to consume messages from SQS queue.

The queue is Standard type.

There are cases [exceptions caught] when I explicitly re-enqueue tasks back to the queue.

    def run(self):
        try:
            # some exceptions occurred
            ...                    
        except Exception as e:
            log.error(str(e), exc_info=True)
            self.enqueue_message()
            return

    def enqueue_message(self, task='llm_extraction_task', queue='llm-extraction-queue'):
        # TODO retries mechanism
        llm_app.send_task(name=task, kwargs=self.message, queue=queue)

Messages are consumed:

@shared_task(name= "llm_extraction_task")
def check_nlp_job_status(**kwargs):
    log.info("Message received in llm_extraction_task")
    consume_llm_data_obj = ConsumeLLMData(payload=kwargs)
    consume_llm_data_obj.run()

This will immediately push the message back to the queue.

The problem is that with a fewer messages - say a SINGLE message - the same re-enqueued message is immediately consumed back.

I want to add a timer or delay to such messages so that failed messages are prioritized or picked later.

Upvotes: 0

Views: 34

Answers (0)

Related Questions