Reputation: 21
I am using Celery to run very long tasks (multiple hours) and want to be able to kill individual tasks.
If I am not mistaken, this is the correct way to gracefully terminate a task:
task = long_task.AsyncResult(task_id)
task.revoke(terminate=True)
My task looks like this:
@celery.task(bind=True, throws=(Terminated,))
def long_task(self):
try:
... do all kinds of things ...
except Terminated:
print("Task was terminated")
except:
print("Unknown exception!")
The first time I terminate a task it prints "Unknown exception!" on the console, so an exception is raised and caught within the task, but somehow not the Terminated exception. I don't understand why it's not catching the Terminated exception.
I'm importing the Terminated exception like this:
from billiard.exceptions import Terminated
But where things really get weird is that every time I terminate a SECOND task, the console gives me an ERROR/MainProcess Task handler raised error: Terminated(15,). What I think is happening is that this time the task doesn't raise an exception but the worker does.
I suppose I could wrap task.revoke() in an exception handler, but what would be the point -- why would task.revoke() raise an exception like that?
By the way, any third task acts like the first, a fourth task acts like the second, etc. So it's inconsistent across tasks, but 'consistently inconsistent'.
Any suggestions?
Using Celery 4.1.0, Python 3.5.2 and Redis 4.0.7.
Upvotes: 2
Views: 2119
Reputation: 43
I had the same problem but I'm using celery v4.4.0 and python 3.7
I found that despite the error message stating the following
Task handler raised error: Terminated(15)
Traceback (most recent call last):
File "[...]/pool.py", line 1728, in _set_terminated
raise Terminated(-(signum or 0))
billiard.exceptions.Terminated: 15
this is apparently just a log message and a SystemExit Exception is actually raised. It's important to note that SystemExit doesn't inherit from Exception so you need to catch it specifically with SystemExit
try:
...
except SystemExit as e:
...
My full working example is the following, it seems to work without specifying the exceptions in the task definition.
@app.task()
def long_task():
try:
... do the long thing ...
except SystemExit:
print("Task was stopped manually")
except:
print("Some other error occured during task execution")
raise
Upvotes: 1