Reputation: 8187
We use a celery worker in a docker instance. If the docker instance is killed (docker could be changed and brought back up) we need to retry the task. My task currently looks like this:
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build(self, config, import_data):
build_chain = chain(
build_dataset_docstore.s(config, import_data),
build_index.s(),
assemble_bundle.s()
).on_error(handle_chain_error.s())
return build_chain
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_dataset_docstore(self, config, import_data):
# do lots of stuff
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_index(self, config, import_data):
# do lots of stuff
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def assemble_bundle(self, config, import_data):
# do lots of stuff
To imitate the container being restarted (worker being killed) I'm running the following script:
SLEEP_FOR=1
echo "-> Killing worker"
docker-compose-f docker/docker-compose-dev.yml kill worker
echo "-> Waiting $SLEEP_FOR seconds"
sleep $SLEEP_FOR
echo "-> Bringing worker back to life"
docker-compose-f docker/docker-compose-dev.yml start worker
Looking in flower i see the task is STARTED... cool, but...
EDIT: from the docs:
If the worker won’t shutdown after considerate time, for being stuck in an infinite-loop or similar, you can use the KILL signal to force terminate the worker: but be aware that currently executing tasks will be lost (i.e., unless the tasks have the acks_late option set).
I'm using the acks late option, so why isn't this retrying?
Upvotes: 3
Views: 909
Reputation: 8187
The issue here seems to be task_acks_late
(https://docs.celeryproject.org/en/latest/userguide/configuration.html#task-acks-late), which i assume is a param for the celery app, on the task.
I updated task_acks_late
to acks_late
and added reject_on_worker_lost
and this functions as expected.
Thus:
@app.task(bind=True, max_retries=3, default_retry_delay=5, acks_late=True, reject_on_worker_lost=True)
def assemble_bundle(self, config, import_data):
# do lots of stuff
Upvotes: 3