Arthurmoreno
Arthurmoreno

Reputation: 63

How to make a celery task call asynchronous tasks?

I have a Django application that needs to run an optimization algorithm. This algorithm is composed of two parts. The first part is an evolutionary algorithm and this algorithm calls a certain number of tasks of the second part which is a simulated annealing algorithm. The problem is that celery doesn't allow a task calls an asynchronous task. I have tried this code below:

            sa_list = []
            for cromossomo in self._populacao:
                sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

            job = group(sa_list)

            result = job.apply_async()
            resultados = result.get()

This code is part of the evolutionary algorithm which is a celery task. When I tried to run it the celery shows this message:

[2015-12-02 16:20:15,970: WARNING/Worker-1] /home/arthur/django-user/local/lib/python2.7/site-packages/celery/result.py:45: RuntimeWarning: Never call result.get() within a task! See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

In Celery 3.2 this will result in an exception being raised instead of just being a warning.

despite being just a warning the celery seems to be full of tasks and locks.

I searched for a lot of solutions but none of them worked.

Upvotes: 4

Views: 8478

Answers (2)

janrn
janrn

Reputation: 80

The problem is not that celery doesn't allow the execution of async tasks in your example, but that you'll run into a deadlock, hence the warning:

Let's assume you have a task A that spawns a number of subtasks B through apply_async(). Every one of those tasks is executed by a worker. The problem is that if the number of tasks B is larger than the amount of available workers, task A is still waiting for their results (in your example, at least - it's not by default). When task A is still running, the workers that have executed a task B will not execute another one, they are blocked until task A is finished. (I don't know exactly why, but I had this problem just a few weeks ago.)

This means that celery can't execute anything until you manually shut down the workers.

Solutions

This depends entirely what you will do with your task results. If you need them to execute the next subtask, you can chain them through Linking with callbacks or by hardcoding it into the respective tasks (so that you call the first, that calls the second, and so on).

If you only need to see if they are executed and are successful or not, you can use flower to monitor your tasks.

If you need to process the output of all the subtasks further, I recommend writing the results to an xml file: Have task A call all tasks B, and once they are done you execute task C that processes the results. Maybe there are more elegant solutions, but this avoids the deadlock for sure.

Upvotes: 1

scytale
scytale

Reputation: 12641

one way to deal with this is to have a 2 stage pipeline:

def first_task():
    sa_list = []
    for cromossomo in self._populacao:
        sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

    job = group(sa_list)

    result = job.apply_async()
    result.save()
    return result.id

then call it like this:

from path.to.tasks import app, first_task

result_1 = first_task.apply_async()
result_2_id = result_1.get()
result_2 = app.GroupResult.restore(result_2_id)
resultados = result_2.get()

there are other ways to do this that involve more work - you could use a chord to gather the results of the group.

Upvotes: 1

Related Questions