Reputation: 9921
I am using celery to send a task to remote server and trying to get the result back. The state of task is constantly updated using update_state method on remote server.
I am sending task using
app.send_task('task_name')
getting results of celery task is a blocking call and i don't want my django app to wait for result and timeout.
So i tried running another celery task for getting results.
@app.task(ignore_result=True)
def catpure_res(task_id):
task_obj = AsyncResult(task_id)
task_obj.get(on_message=on_msg)
But it results in the error below.
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 367, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 622, in __protected_call__
return self.run(*args, **kwargs)
File "/home/arpit/project/appname/tasks/results.py", line 42, in catpure_res
task_obj.get(on_message=on_msg)
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 168, in get
assert_will_not_block()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 44, in assert_will_not_block
raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks
Is there any workaround for this error. Do I have to run a daemon process for getting the results?
Upvotes: 21
Views: 16276
Reputation: 380
from celery.result import allow_join_result
task_obj = send_task("do_something", [arg1, arg2, arg3])
with allow_join_result():
def on_msg(*args, **kwargs):
print(f"on_msg: {args}, {kwargs}")
try:
result = task_obj.get(on_msg=on_msg, timeout=timeout_s)
except TimeoutError as exc:
print("Timeout!")
Upvotes: 1
Reputation: 575
Use allow_join_result. See the snippet below.
@app.task(ignore_result=True)
def catpure_res(task_id):
task_obj = AsyncResult(task_id)
with allow_join_result():
task_obj.get(on_message=on_msg)
Note: As mentioned in the other answers it can cause performance issue and even deadlock, but if your task is well written and doesn't cause unexpected errors than it should work like a charm.
Upvotes: 30
Reputation: 11337
As your title explain, calling get
within a task is a bad practice and can lead to deadlock.
instead, you can check for the task status and get
it result whenever it's ready:
result = catpure_res.AsyncResult(task_id, app=app)
if result.ready():
return result.get()
return result.state
You can wrap the above snippet within a function and request for it every x seconds.
EDIT: regard your comment:
You can get the result.state
instead, and use the retry
mechanism with countdown
until the task result.state == SUCCESS
.
You can add celery beat
to run periodic task that check if the primary task ends.
Note that using such heavy task (of long duration) is also a bad practice. consider to break it apart into a small tasks and use canvas to combine them.
Upvotes: 5