desergik
desergik

Reputation: 55

Celery shutdown worker and retry the task

I need to implement the following logic in celery task: if some condition is met, shutdown the current worker and retry the task.

Tested on the sample task:

@app.task(bind=True, max_retries=1)
def shutdown_and_retry(self, config):
    try:
        raise Exception('test exection')
    except Exception as exc:
        print('Retry {}/{}, task id {}'.format(self.request.retries, self.max_retries, self.request.id))
        app.control.shutdown(destination=[self.request.hostname])  # send shutdown signal to the current worker
        raise self.retry(exc=exc, countdown=5)
    print('Execute task id={} retries={}'.format(self.request.id, self.request.retries))
    return 'some result'

But it give strange results, steps:

  1. Run worker: celery worker -Q test_queue -A test_worker -E -c 1 -n test_worker_1.
  2. Push task to the "test_queue" queue.
  3. Worker caught it and shutdown. I opened the list of tasks in 'test_queue' in RabbitMQ and saw:
    • Original task submitted by publisher, retries = 0 (comes from app.control.shutdown() call);
    • Copy of original task (with same id), retries = 1 (comes from self.retry() call).
  4. Then I started another worker to the same queue, it caught the task and shutdown also. But on Broker one more copy of the original task was pushed to the queue with the same id and retries = 1. So, I had 3 tasks in the queue. All next runs of workers gave + 1 new task to queue. Condition max_retries = 1 hasn't worked in this case.

What I have tried:

  1. Set task_reject_on_worker_lost = True in celeryconfig.py and run the same task. Result: nothing changed.
  2. Leave only shutdown call in worker's task. Result: only original task is pushed back on each try (there is no tasks duplication), but it didn't count retries (always set to 0);
  3. Add app.control.revoke(self.request.id) before shutdown and retry calls in the worker (based on this). Result: after the first try got the same (2 tasks in queue), but when I run second worker queue flushed and it didn't run anything. So, the task is lost and not retried.

Is there a way to do not push back the original task to queue during app.control.shutdown() call? It seems that this is the root cause. Or could you please suggest another workaround which will allow to implement the right logic pointed above.

Setup: RabbitMQ 3.8.2, celery 4.1.0, python 3.5.4

Settings in celeryconfig.py:

task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = False
task_track_started = True
worker_prefetch_multiplier = 1
worker_disable_rate_limits = True

Upvotes: 1

Views: 3090

Answers (1)

Robert Kearns
Robert Kearns

Reputation: 1706

It looks like the issue is task_acks_late in your configuration file. By using that, you are saying "Only remove the task from the queue when I have finished running". You then kill the worker, so it is never acknowledged (and you get duplicates of the task).

Upvotes: 1

Related Questions