Random Someone
Random Someone

Reputation: 392

How to spawn a celery task from previous celery task?

I maybe using celery incorrectly. But the chatbot I am developing requires celery with redis for async tasks. This is the framework I am using: http://microsoftbotframework.readthedocs.io/en/latest/asynctasks/.

My particular use case currently requires me to run a celery task forever and wait for some arbitrary amount of time in between, which ranges from 30 minutes to 3 days. Something like this

@celery.task
def myAsyncMethod():
    while true:
        timeToWait = getTimeToNextAlarm()
        sleep(timeToWait)
        sendOutMessages()

Basically, I have an async process which never exits. I am pretty sure shouldn't be using celery like this. So my question is, how do I create a celery task which processes the first task, spawns a task and submits it to celery queue and exits. Basically, something like this:

@celery.task
def myImprovedTask():
    timeToWait = getTimeToNextAlarm()
    sleep(timeToWait)
    sendOutMessages()
    myImprovedTask().delay()    # recursive call to async method for next event

Not necessarily recursive or even like this, but something which is the way celery originally is intended to be used (for short lived tasks I believe?)

Tl;dr: How do I create a celery task from within another task and make the original task exit?

Do tell if I should explain it further. Thanks.

Upvotes: 4

Views: 7709

Answers (2)

randomir
randomir

Reputation: 18697

If you want to run another task from your initial task, just call it as you would usually do with Task.delay(), or Task.apply_async():

@celery.task
def myImprovedTask():
    timeToWait = getTimeToNextAlarm()
    sleep(timeToWait)
    sendOutMessages()
    myImprovedTask.delay()

It doesn't matter if you call the same task again. It gets enqueued with delay(), your original task returns, and then the next task in queue gets run.


All this is under an assumption you are actually calling your Celery tasks asynchronously. Sometimes that's not a case, a common culprit being the task-always-eager config option. By default it's disabled, but (from the docs):

If task_always_eager is True, all tasks will be executed locally by blocking until the task returns. apply_async() and Task.delay() will return an EagerResult instance, that emulates the API and behavior of AsyncResult, except the result is already evaluated.

That is, tasks will be executed locally instead of being sent to the queue.

So, be sure your Celery config includes:

task_always_eager = False

Upvotes: 4

Kenjin
Kenjin

Reputation: 136

If the task isn’t registered in the current process you can use send_task() to call the task by name instead

as defined in the docs here http://docs.celeryproject.org/en/latest/reference/celery.html#celery.Celery.send_task

app.send_task('task_name')

Doing it this way, you have to explicitly name the tasks like:

@celery.task(name="myImprovedTask")
def myImprovedTask():

And then you are able to call it with:

app.send_task('myImprovedTask')

If you do not like this way (or you have the file in the same file), you can also call it with apply_async or delay like:

myImprovedTask.delay()
myImprovedTask.apply_async()

Upvotes: 0

Related Questions