Reputation: 2275
In celery i want to get the task status for all the tasks for specific task name. For that tried below code.
import celery.events.state
# Celery status instance.
stat = celery.events.state.State()
# task_by_type will return list of tasks.
query = stat.tasks_by_type("my_task_name")
# Print tasks.
print query
Now i'm getting empty list in this code.
Upvotes: 10
Views: 7535
Reputation: 412
In celery you can easily find the status of task by accessing them through task ID if you want to access them from other function.
Sample Code:-
@task(name='Sum_of_digits')
def ABC(x,y):
return x+y
Add this task for processing
res = ABC.delay(1, 2)
Now use the task res to fetch the state, status and results(res.get())
print(f"id={res.id}, state={res.state}, status={res.status}")
Upvotes: 0
Reputation: 9801
celery.events.state.State()
is a data-structure used to keep track of the state of celery workers and tasks. When calling State()
, you get an empty state object with no data.
You should use app.events.Receiver
(Stream Processing) or celery.events.snapshot
(Batch Processing) to capture state that contains tasks.
Sample Code:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
Upvotes: 3
Reputation: 2903
This isn't natively supported. Depending on the backend (Mongo, Redis, etc), you may or may not be able to introspect the contents of a queue and find out what's in it. Even if you do, you'll miss items currently in progress.
That said, you could manage this yourself:
result = mytask.delay(...)
my_datastore.save("mytask", result.id)
...
for id in my_datastore.find(task="mytask"):
res = AsyncResult(id)
print res.state
Upvotes: 1