Denis V
Denis V

Reputation: 3370

Sequential task execution in Celery

I have some "heavy" requests to the database that I'm going to execute using Celery. Taking into account that they are "heavy" I want to execute them sequentially (one by one). One possible solution is to specify --concurrency=1 in the command line to Celery. And it works. But there is a problem: while the tasks are being executed all the following requests return None:

from celery.task.control import inspect

# Inspect all nodes.
i = inspect()

print(i.scheduled()) # None
print(i.active()) # None
print(i.reserved()) # None
print(i.registered()) # None

Also, running celery inspect ping returns Error: No nodes replied within time constraint. So that I can't receive any information on the Celery queue state.

There are my test python modules:

celeryconfig.py

#BROKER_URL = 'redis://localhost:6379/0'
BROKER_URL = 'amqp://'
#CELERY_RESULT_BACKEND = "redis"
CELERY_RESULT_BACKEND = "amqp://"
# for php
CELERY_TASK_RESULT_EXPIRES = None
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACKS_LATE = True

tasks.py

from celery import Celery
from time import sleep

app = Celery('hello')
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    sleep(30)       
    return x + y

client.py

from tasks import add

result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)

So, the question is, how to run the tasks one by one AND be able to check the state of the queue?

Upvotes: 2

Views: 2333

Answers (1)

Louis
Louis

Reputation: 151390

Celery's inspections are performed by broadcasting a query to anything that listens and then gathering the responses. Any worker that does not respond within the timeout (which I think is 1 second by default), is going to be ignored. It is as if it did not exist.

The fact that you use --concurrency=1 should not be an issue. I've just tried it and it worked fine here. Even with a concurrency of 1, a Celery worker will normally have an extra thread of execution for communications. (I say "normally" because I'm sure there are ways to configure Celery to shoot itself in the foot. What I say holds with the defaults.) When I tried --concurrency=1 there were actually two threads per worker. So even if the worker is busy computing a task, there should be a thread able to respond to the broadcast.

This being said, if the machine is under heavy load, then it may take too long for the worker to respond. The way I've worked around this is to retry calls like i.scheduled() until I get an answer from everyone. In my projects, I know how many workers should be up and running and so I have a list I can use to know whether everyone has responded.

Upvotes: 2

Related Questions