Reputation: 3685
I'm using celery (solo pool with concurrency=1) and I want to be able to shut down the worker after a particular task has run. A caveat is that I want to avoid any possibility of the worker picking up any further tasks after that one.
Here's my attempt in the outline:
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun
app = Celery()
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
raise WorkerShutdown()
However, when I run the worker
celery -A celeryapp worker --concurrency=1 --pool=solo
and run the task
add.delay(1,4)
I get the following:
-------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: __main__:0x7f596896ce90
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)
The task is re-queued and will be run again on another worker, leading to a loop.
This also happens when I move the WorkerShutdown
exception within the task itself.
@app.task
def add(x, y):
print(x + y)
raise WorkerShutdown()
Is there a way I can shut down the worker after a particular task, while avoiding this unfortunate side-effect?
Upvotes: 31
Views: 19342
Reputation: 73
Adding to sytech's answer, if you are using the annotation @shared_task
instead of @app.task
, or/and you don't want to use (bind=True)
, you can use this option without using self
parameter:
@shared_task
def some_function_name():
app.control.revoke(some_function_name.request.id)
app.control.shutdown()
Reference: https://stackoverflow.com/a/18876650/1856563
Upvotes: 1
Reputation: 55
If you need to shutdown a specific worker and don't know it's name in advance, you can get it from the task properties. Based on the answers above, you can use:
app.control.shutdown(destination=[self.request.hostname])
or
app.control.broadcast('shutdown', destination=[self.request.hostname])
Note:
'-n'
);bind=True
parameter.Upvotes: 3
Reputation: 29514
If you shutdown the worker, after the task has completed, it won't re-queue again.
@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
app.control.broadcast('shutdown')
This will gracefully shutdown the worker after tasks is completed.
[2018-04-01 18:44:14,627: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-04-01 18:44:14,656: INFO/MainProcess] mingle: searching for neighbors
[2018-04-01 18:44:15,719: INFO/MainProcess] mingle: all alone
[2018-04-01 18:44:15,742: INFO/MainProcess] celery@foo ready.
[2018-04-01 18:46:28,572: INFO/MainProcess] Received task: celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b]
[2018-04-01 18:46:28,585: INFO/ForkPoolWorker-4] Task celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b] succeeded in 0.005628278013318777s: 3
[2018-04-01 18:46:28,665: WARNING/MainProcess] Got shutdown from remote
Note: broadcast will shutdown all workers. If you want to shutdonw a specific worker, start worker with a name
celery -A celeryapp worker -n self_killing --concurrency=1 --pool=solo
Now you can shutdown this with destination parameter.
app.control.broadcast('shutdown', destination=['celery@self_killing'])
Upvotes: 5
Reputation: 40861
The recommended process for shutting down a worker is to send the TERM
signal. This will cause a celery worker to shutdown after completing any currently running tasks. If you send a QUIT
signal to the worker's main process, the worker will shutdown immediately.
The celery docs, however, usually discuss this in terms of managing celery from a command line or via systemd/initd, but celery additionally provides a remote worker control API via celery.app.control
.
You can revoke a task to prevent workers from executing the task. This should prevent the loop you are experiencing. Further, control supports shutdown of a worker in this manner as well.
So I imagine the following will get you the behavior you desire.
@app.task(bind=True)
def shutdown(self):
app.control.revoke(self.id) # prevent this task from being executed again
app.control.shutdown() # send shutdown signal to all workers
Since it's not currently possible to ack the task from within the task, then continue executing said task, this method of using revoke
circumvents this problem so that, even if the task is queued again, the new worker will simply ignore it.
Alternatively, the following would also prevent a redelivered task from being executed a second time...
@app.task(bind=True)
def some_task(self):
if self.request.delivery_info['redelivered']:
raise Ignore() # ignore if this task was redelivered
print('This should only execute on first receipt of task')
Also worth noting AsyncResult
also has a revoke
method that calls self.app.control.revoke
for you.
Upvotes: 14