Motiejus Jakštys
Motiejus Jakštys

Reputation: 2979

celery issuing tasks from subtasks

My "big" task works in steps: it can either terminate or spawn more tasks. In my example, I count to 5.

from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
from time import sleep

@app.task
def slow_add(x):
    """Slowly counts to 5"""
    sleep(1)
    print (x)
    if x == 5:
        return x
    else:
        return slow_add.s(x+1)()

When I schedule the task, I get only one invocation:

In [48]: asks.slow_add.run(1)
1
2
3
4
5
Out[48]: 5
  1. How to call it asynchronously? I tried different variations of apply_async, delay, but no avail.
  2. I see no tasks in the celery monitor. Why?
  3. How can I get intermediate state (in this case, the number between 1 and 5) while the task is asynchronously executing?

Upvotes: 2

Views: 1313

Answers (1)

Syed Habib M
Syed Habib M

Reputation: 1817

In My example,

@app.task
def test(x):
    import time
    time.sleep(1)
    print x
    if x == 5:
        return x
    else:
        return test.delay(x+1)
  1. It works fine asynchronously when I used res = test.delay(1)
  2. In Celery monitor I can see task received

    Received task: services.tasker.test[67f22e02-7c39-4c1d-a646-acabeb72d208]
    1
    Received task: services.tasker.test[9eed6d45-4931-4790-8477-3bbe75e213e4]
    Task services.tasker.test[67f22e02-7c39-4c1d-a646-acabeb72d208] succeeded in 3.021544273s: None
    2
    ...

  3. You can get task state using res.ready(), it returns True when the task is finished otherwise it returns False

In my test case, the result are like

>>> res = test.delay(1)
>>> res.ready()  # before finishing task
False
>>> res.ready()  # after finishing task
True

or

def final_result(r):
    if isinstance(r.result, celery.result.AsyncResult):
         return final_result(r.result)
    else:
        return r.result

and use above like

>>> print final_result(res)
5

Upvotes: 2

Related Questions