Reputation: 3370
I have some "heavy" requests to the database that I'm going to execute using Celery. Taking into account that they are "heavy" I want to execute them sequentially (one by one). One possible solution is to specify --concurrency=1
in the command line to Celery. And it works. But there is a problem: while the tasks are being executed all the following requests return None
:
from celery.task.control import inspect
# Inspect all nodes.
i = inspect()
print(i.scheduled()) # None
print(i.active()) # None
print(i.reserved()) # None
print(i.registered()) # None
Also, running celery inspect ping
returns Error: No nodes replied within time constraint.
So that I can't receive any information on the Celery queue state.
There are my test python modules:
celeryconfig.py
#BROKER_URL = 'redis://localhost:6379/0'
BROKER_URL = 'amqp://'
#CELERY_RESULT_BACKEND = "redis"
CELERY_RESULT_BACKEND = "amqp://"
# for php
CELERY_TASK_RESULT_EXPIRES = None
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACKS_LATE = True
tasks.py
from celery import Celery
from time import sleep
app = Celery('hello')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
sleep(30)
return x + y
client.py
from tasks import add
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
So, the question is, how to run the tasks one by one AND be able to check the state of the queue?
Upvotes: 2
Views: 2333
Reputation: 151390
Celery's inspections are performed by broadcasting a query to anything that listens and then gathering the responses. Any worker that does not respond within the timeout (which I think is 1 second by default), is going to be ignored. It is as if it did not exist.
The fact that you use --concurrency=1
should not be an issue. I've just tried it and it worked fine here. Even with a concurrency of 1, a Celery worker will normally have an extra thread of execution for communications. (I say "normally" because I'm sure there are ways to configure Celery to shoot itself in the foot. What I say holds with the defaults.) When I tried --concurrency=1
there were actually two threads per worker. So even if the worker is busy computing a task, there should be a thread able to respond to the broadcast.
This being said, if the machine is under heavy load, then it may take too long for the worker to respond. The way I've worked around this is to retry calls like i.scheduled()
until I get an answer from everyone. In my projects, I know how many workers should be up and running and so I have a list I can use to know whether everyone has responded.
Upvotes: 2