Lars de Kock
Lars de Kock

Reputation: 21

Gracefully terminating a celery task

I am using Celery to run very long tasks (multiple hours) and want to be able to kill individual tasks.

If I am not mistaken, this is the correct way to gracefully terminate a task:

task = long_task.AsyncResult(task_id)
task.revoke(terminate=True)

My task looks like this:

@celery.task(bind=True, throws=(Terminated,))
def long_task(self):
    try:
        ... do all kinds of things ...   
    except Terminated:
        print("Task was terminated")
    except:
        print("Unknown exception!")

The first time I terminate a task it prints "Unknown exception!" on the console, so an exception is raised and caught within the task, but somehow not the Terminated exception. I don't understand why it's not catching the Terminated exception.

I'm importing the Terminated exception like this:

from billiard.exceptions import Terminated

But where things really get weird is that every time I terminate a SECOND task, the console gives me an ERROR/MainProcess Task handler raised error: Terminated(15,). What I think is happening is that this time the task doesn't raise an exception but the worker does.

I suppose I could wrap task.revoke() in an exception handler, but what would be the point -- why would task.revoke() raise an exception like that?

By the way, any third task acts like the first, a fourth task acts like the second, etc. So it's inconsistent across tasks, but 'consistently inconsistent'.

Any suggestions?

Using Celery 4.1.0, Python 3.5.2 and Redis 4.0.7.

Upvotes: 2

Views: 2119

Answers (1)

C. Zeil
C. Zeil

Reputation: 43

I had the same problem but I'm using celery v4.4.0 and python 3.7

I found that despite the error message stating the following

Task handler raised error: Terminated(15)
Traceback (most recent call last):
File "[...]/pool.py", line 1728, in _set_terminated
    raise Terminated(-(signum or 0))
billiard.exceptions.Terminated: 15

this is apparently just a log message and a SystemExit Exception is actually raised. It's important to note that SystemExit doesn't inherit from Exception so you need to catch it specifically with SystemExit

try:
    ...
except SystemExit as e:
   ...

My full working example is the following, it seems to work without specifying the exceptions in the task definition.

@app.task()
def long_task():
    try:
        ... do the long thing ...
    except SystemExit:
        print("Task was stopped manually")
    except:
        print("Some other error occured during task execution")
        raise

Upvotes: 1

Related Questions