Reputation: 392
I maybe using celery incorrectly. But the chatbot I am developing requires celery with redis for async tasks. This is the framework I am using: http://microsoftbotframework.readthedocs.io/en/latest/asynctasks/.
My particular use case currently requires me to run a celery task forever and wait for some arbitrary amount of time in between, which ranges from 30 minutes to 3 days. Something like this
@celery.task
def myAsyncMethod():
while true:
timeToWait = getTimeToNextAlarm()
sleep(timeToWait)
sendOutMessages()
Basically, I have an async process which never exits. I am pretty sure shouldn't be using celery like this. So my question is, how do I create a celery task which processes the first task, spawns a task and submits it to celery queue and exits. Basically, something like this:
@celery.task
def myImprovedTask():
timeToWait = getTimeToNextAlarm()
sleep(timeToWait)
sendOutMessages()
myImprovedTask().delay() # recursive call to async method for next event
Not necessarily recursive or even like this, but something which is the way celery originally is intended to be used (for short lived tasks I believe?)
Tl;dr: How do I create a celery task from within another task and make the original task exit?
Do tell if I should explain it further. Thanks.
Upvotes: 4
Views: 7709
Reputation: 18697
If you want to run another task from your initial task, just call it as you would usually do with Task.delay()
, or Task.apply_async()
:
@celery.task
def myImprovedTask():
timeToWait = getTimeToNextAlarm()
sleep(timeToWait)
sendOutMessages()
myImprovedTask.delay()
It doesn't matter if you call the same task again. It gets enqueued with delay()
, your original task returns, and then the next task in queue gets run.
All this is under an assumption you are actually calling your Celery tasks asynchronously. Sometimes that's not a case, a common culprit being the task-always-eager
config option. By default it's disabled, but (from the docs):
If
task_always_eager
isTrue
, all tasks will be executed locally by blocking until the task returns.apply_async()
andTask.delay()
will return anEagerResult
instance, that emulates the API and behavior ofAsyncResult
, except the result is already evaluated.That is, tasks will be executed locally instead of being sent to the queue.
So, be sure your Celery config includes:
task_always_eager = False
Upvotes: 4
Reputation: 136
If the task isn’t registered in the current process you can use send_task() to call the task by name instead
as defined in the docs here http://docs.celeryproject.org/en/latest/reference/celery.html#celery.Celery.send_task
app.send_task('task_name')
Doing it this way, you have to explicitly name the tasks like:
@celery.task(name="myImprovedTask")
def myImprovedTask():
And then you are able to call it with:
app.send_task('myImprovedTask')
If you do not like this way (or you have the file in the same file), you can also call it with apply_async or delay like:
myImprovedTask.delay()
myImprovedTask.apply_async()
Upvotes: 0