Hitul Mistry
Hitul Mistry

Reputation: 2275

In celery how to get the task status for all the tasks for specific task name?

In celery i want to get the task status for all the tasks for specific task name. For that tried below code.

import celery.events.state

# Celery status instance.
stat = celery.events.state.State()

# task_by_type will return list of tasks.
query = stat.tasks_by_type("my_task_name")

# Print tasks.
print query

Now i'm getting empty list in this code.

Upvotes: 10

Views: 7535

Answers (3)

Shahzaib Ali
Shahzaib Ali

Reputation: 412

In celery you can easily find the status of task by accessing them through task ID if you want to access them from other function.

Sample Code:-

@task(name='Sum_of_digits')
def ABC(x,y):
   return x+y

Add this task for processing

 res = ABC.delay(1, 2)

Now use the task res to fetch the state, status and results(res.get())

 print(f"id={res.id}, state={res.state}, status={res.status}")

Upvotes: 0

Leonardo.Z
Leonardo.Z

Reputation: 9801

celery.events.state.State() is a data-structure used to keep track of the state of celery workers and tasks. When calling State(), you get an empty state object with no data.

You should use app.events.Receiver(Stream Processing) or celery.events.snapshot(Batch Processing) to capture state that contains tasks.

Sample Code:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

Upvotes: 3

Nino Walker
Nino Walker

Reputation: 2903

This isn't natively supported. Depending on the backend (Mongo, Redis, etc), you may or may not be able to introspect the contents of a queue and find out what's in it. Even if you do, you'll miss items currently in progress.

That said, you could manage this yourself:

result = mytask.delay(...)
my_datastore.save("mytask", result.id)
...
for id in my_datastore.find(task="mytask"):
    res = AsyncResult(id)
    print res.state

Upvotes: 1

Related Questions