Reputation: 3418
I'm running multiple simulations as tasks through celery (version 2.3.2) from django. The simulations get set up by another task:
In views.py:
result = setup_simulations.delay(parameters)
request.session['sim'] = result.task_id # Store main task id
In tasks.py:
@task(priority=1)
def setup_simulations(parameters):
task_ids = []
for i in range(number_of_simulations):
result = run_simulation.delay(other_parameters)
task_ids.append(result.task_id)
return task_ids
After the initial task (setup_simulations) has finished, I try to revoke the simulation tasks as follows:
main_task_id = request.session['sim']
main_result = AsyncResult(main_task_id)
# Revoke sub tasks
from celery.task.control import revoke
for sub_task_id in main_result.get():
sub_result = AsyncResult(sub_task_id); sub_result.revoke() # Does not work
# revoke(sub_task_id) # Does not work neither
When I look at the output from "python manage.py celeryd -l info", the tasks get executed as if nothing had happened. Any ideas somebody what could have gone wrong?
Upvotes: 1
Views: 3896
Reputation: 19499
As you mention in the comment, revoke
is a remote control command so it's only currently supported by the amqp and redis transports.
You can accomplish this yourself by storing a revoked flag in your database, e.g:
from celery import states
from celery import task
from celery.exceptions import Ignore
from myapp.models import RevokedTasks
@task
def foo():
if RevokedTasks.objects.filter(task_id=foo.request.id).count():
if not foo.ignore_result:
foo.update_state(state=states.REVOKED)
raise Ignore()
If your task is working on some model you could even store a flag in that.
Upvotes: 1