Reputation: 55
I need to implement the following logic in celery task: if some condition is met, shutdown the current worker and retry the task.
Tested on the sample task:
@app.task(bind=True, max_retries=1)
def shutdown_and_retry(self, config):
try:
raise Exception('test exection')
except Exception as exc:
print('Retry {}/{}, task id {}'.format(self.request.retries, self.max_retries, self.request.id))
app.control.shutdown(destination=[self.request.hostname]) # send shutdown signal to the current worker
raise self.retry(exc=exc, countdown=5)
print('Execute task id={} retries={}'.format(self.request.id, self.request.retries))
return 'some result'
But it give strange results, steps:
celery worker -Q test_queue -A test_worker -E -c 1 -n test_worker_1
.What I have tried:
task_reject_on_worker_lost = True
in celeryconfig.py and run the same task. Result: nothing changed.app.control.revoke(self.request.id)
before shutdown and retry calls in the worker (based on this). Result: after the first try got the same (2 tasks in queue), but when I run second worker queue flushed and it didn't run anything. So, the task is lost and not retried.Is there a way to do not push back the original task to queue during app.control.shutdown()
call? It seems that this is the root cause. Or could you please suggest another workaround which will allow to implement the right logic pointed above.
Setup: RabbitMQ 3.8.2, celery 4.1.0, python 3.5.4
Settings in celeryconfig.py:
task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = False
task_track_started = True
worker_prefetch_multiplier = 1
worker_disable_rate_limits = True
Upvotes: 1
Views: 3090
Reputation: 1706
It looks like the issue is task_acks_late
in your configuration file. By using that, you are saying "Only remove the task from the queue when I have finished running". You then kill the worker, so it is never acknowledged (and you get duplicates of the task).
Upvotes: 1