Danilo Bargen
Danilo Bargen

Reputation: 19432

Celery: Abort or revoke all tasks in a chord

I use the following setup with a Redis broker and backend:

chord([A, A, A, ...])(B)

The user has the possibility to abort the A tasks. Unfortunately, when calling AbortableAsyncResult(task_a_id).abort() on all the task A instances, only the active ones are being aborted. The status for tasks that have not been received yet by a worker are changed to ABORTED, but they're still processed and the is_aborted() flag returns False.

I could of course revoke() the pending tasks instead of abort()-ing them, but the problem is that in that case the chord body (task B) is not executed anymore.

How can all pending and running task A instances be stopped, while still ensuring that task B runs?

Upvotes: 5

Views: 4818

Answers (2)

Chillar Anand
Chillar Anand

Reputation: 29514

Just get a list of id's of all the instances of A and stop them.

Consider this simple chord

from celery import chord 

my_chord = chord(a.si() for i in range(300))(b.si())

Now you can get a list of subtasks(all instaces of a task) from my_chord instance, using

for taks in my_chord.parent.subtasks:
    print(task.id)

Now you can do whatever you want to do with those task instaces. For example you can revoke all of the irrespective of their current state.

from celery.task.control import revoke

for task in my_chord.parent.subtasks:
    revoke(task.id, terminate=True)

revoke by default kills only pending tasks. But if you pass terminate=True to it, it kills the executing tasks also.

Also, chord's callback function will be called after all of its subtasks are executed successfully. Since you are cancelling the subtasks of chord, the call back function won't be called and the chord task results in failure. So, you have to retry the callback task.

Upvotes: 4

user2097159
user2097159

Reputation: 892

Instead of chording the tasks themselves you may want to consider having the chords tasks that watch the A tasks. What I mean by this is the chord would contain tasks that check the running tasks(A) every so often to see if they are done or revoked. When all of those return successfully the chord with then chain into task B

Upvotes: 0

Related Questions