Reputation: 4151
I'm using Celery to consume messages from SQS queue.
The queue is Standard type.
There are cases [exceptions caught] when I explicitly re-enqueue tasks back to the queue.
def run(self):
try:
# some exceptions occurred
...
except Exception as e:
log.error(str(e), exc_info=True)
self.enqueue_message()
return
def enqueue_message(self, task='llm_extraction_task', queue='llm-extraction-queue'):
# TODO retries mechanism
llm_app.send_task(name=task, kwargs=self.message, queue=queue)
Messages are consumed:
@shared_task(name= "llm_extraction_task")
def check_nlp_job_status(**kwargs):
log.info("Message received in llm_extraction_task")
consume_llm_data_obj = ConsumeLLMData(payload=kwargs)
consume_llm_data_obj.run()
This will immediately push the message back to the queue.
The problem is that with a fewer messages - say a SINGLE message - the same re-enqueued message is immediately consumed back.
I want to add a timer or delay to such messages so that failed messages are prioritized or picked later.
Upvotes: 0
Views: 34