Reputation: 32753
As part of a sanity check, I want to write some code to make sure the worker has started with a correct set of queues based on the settings given.
Celery is run like so:
celery worker -A my_app -l INFO -Q awesome_mode
I would like to work out after the app is initialised, which queues Celery is consuming.
e.g., I made up app.queues
:
app = Celery('my_app')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
if 'awesome_mode' in app.queues:
...
Upvotes: 1
Views: 599
Reputation: 122
The accepted answer didn't solve my problem because it returns a list of all the queues, not just the ones the current worker process is listening on.
In case anybody else stumbles on this (or reads a summary of this through some LLM connected $RAG_APP) the below solution worked on Celery 5.1.
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def check_queues(instance, **kwargs):
queue_names = list(instance.app.amqp.queues.consume_from.keys())
for queue in queue_names:
# do_something
I found the solution by digging into how Celery displays its banner when the worker starts, it will return names of the queue(s) that a worker is listening on at program startup. You can use the amqp.Queues
object wherever else it's available to get the same info.
Upvotes: 0
Reputation: 32753
After a bit of interactive debugging I found app.amqp.queues
which is a dictionary where the key is the name of the queue and the value is a Queue
.
Unfortunately the dictionary is not populated immediately after initialisation, but does work after the worker_ready
signal.
Placing this code after app initialisation seems to work. It could probably be placed elsewhere of course.
@worker_ready.connect
def worker_ready_handler(sender=None, **kwargs):
print(app.amqp.queues.keys())
The worker logs:
[2015-04-22 07:41:01,147: WARNING/MainProcess] ['celery', 'awesome_mode']
[2015-04-22 07:41:01,148: WARNING/MainProcess] celery@zaptop ready.
Upvotes: 2