yohjp
yohjp

Reputation: 3125

How can I get revoked flag for current task in Celery worker?

I want to implement task cancellation with cleanup process on Celery + RabbitMQ broker. How can I get "REVOKED" status of current task in Celery worker?

# tasks.py -- celery worker
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
  for i in range(0, 10):
    time.sleep(1)
    # I want check here for cleanup.
  return x + y

# caller.py
from tasks import add
result = add.delay(4, 4)
result.revoke()

Celery supports Abortable tasks, but it only works the database backend.

Python 3.4.1 / Celery 3.1.17 / RabbitMQ 3.4.4

Upvotes: 1

Views: 3332

Answers (2)

boses
boses

Reputation: 410

Felippe Da Motta Raposo's suggestion works inside my custom task:

from celery import Task
from celery.task.control import inspect

WORKER_NAME = "celery@server"
inspector = inspect([WORKER_NAME])

class CustomTask(Task):
    def _is_revoked(self):
        revoked_list = inspector.revoked()
        return (revoked_list and self.task_id in revoked_list[WORKER_ADDRESS]

    def run(self, *args, **kwargs):
        self.task_id = self.request.id

Upvotes: 0

Felippe Raposo
Felippe Raposo

Reputation: 431

Take a look at the scheduled_tasks, you can ask celery if your task is scheduled to run.

EX.:

    import celery
    celery_inspect = celery.current_app.control.inspect()
    celery_inspect.registered_tasks()

This method returns a dict with all celery scheduled tasks by worker.

Upvotes: -1

Related Questions