Reputation: 331
Celery - bottom line: I want to get the task name by using the task id (I don't have a task object)
Suppose I have this code:
res = chain(add.s(4,5), add.s(10)).delay()
cache.save_task_id(res.task_id)
And then in some other place:
task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')
I know I can get the task name if I have a task object, like here: celery: get function name by task id?. But I don't have a task object (perhaps it can be created by the task_id or by some other way? I didn't see anything related to that in the docs).
In addition, I don't want to save in the cache the task name. (Suppose I have a very long chain/other celery primitives, I don't want to save all their names/task_ids. Just the last task_id should be enough to get all the information regarding all the tasks, using .parents, etc)
I looked at all the relevant methods of AsyncResult and AsyncResult.Backend objects. The only thing that seemed relevant is backend.get_task_meta(task_id), but that doesn't contain the task name. Thanks in advance
PS: AsyncResult.name always returns None:
result = AsyncResult(task_id, app=celery_app)
result.name #Returns None
result.args #Also returns None
Upvotes: 10
Views: 9219
Reputation: 85
You have to enable it first in Celery configurations:
celery_app = Celery()
...
celery_app.conf.update(result_extended=True)
Then, you can access it:
task = AsyncResult(task_id, app=celery_app)
task.name
Upvotes: 3
Reputation: 298
I found a good answer in this code snippet.
If and when you have an instance of AsyncResult
you do not need the task_id, rather you can simply do this:
result # instance of AsyncResult
result_meta = result._get_task_meta()
task_name = result_meta.get("task_name")
Of course this relies on a private method, so it's a bit hacky. I hope celery introduces a simpler way to retrieve this - it's especially useful for testing.
Upvotes: 0
Reputation: 81
This is not very clear from the docs for celery.result.AsyncResult but not all the properties are populated unless you enable result_extended = True
as per configuration docs:
result_extended
Default: False
Enables extended task result attributes (name, args, kwargs, worker, retries, queue, delivery_info) to be written to backend.
Then the following will work:
result = AsyncResult(task_id)
result.name = 'project.tasks.my_task'
result.args = [2, 3]
result.kwargs = {'a': 'b'}
Also be aware that the rpc:// backend does not store this data, you will need Redis, or similar. If you are using rpc, even with result_extended = True
you will still get None
returned.
Upvotes: 2
Reputation: 331
Finally found an answer.
For anyone wondering:
You can solve this by enabling result_extended = True
in your celery config.
Then:
result = AsyncResult(task_id, app=celery_app)
result.task_name #tasks.add
Upvotes: 9
Reputation: 19822
Something like the following (pseudocode) should be enough:
app = Celery("myapp") # add your parameters here
task_id = "6dc5f968-3554-49c9-9e00-df8aaf9e7eb5"
aresult = app.AsyncResult(task_id)
task_name = aresult.name
task_args = aresult.args
print(task_name, task_args)
Unfortunately, it does not work (I would say it is a bug in Celery), so we have to find an alternative. First thing that came to my mind was that Celery CLI has inspect query_task
feature, and that hinted me that it would be possible to find task name by using the inspect API, and I was right. Here is the code:
# Since the expected way does not work we need to use the inspect API:
insp = app.control.inspect()
task_ids = [task_id]
inspect_result = insp.query_task(*task_ids)
# print(inspect_result)
for node_name in inspect_result:
val = inspect_result[node_name]
if val:
# we found node that executes the task
arr = val[task_id]
state = arr[0]
meta = arr[1]
task_name = meta["name"]
task_args = meta["args"]
print(task_name, task_args)
Problem with this approach is that it works only while the task is running. The moment it is done you will not be able to use the code above.
Upvotes: 1