Greg0ry
Greg0ry

Reputation: 1069

start celery worker and enable it for broadcast queue

I'm trying to start celery worker so it only listens to single queue. This is not a problem, I can do this that way:

python -m celery worker -A my_module -Q my_queue -c 1

But now I also want this my_queue queue to be a broadcast queue, so I do this in my celeryconfig:

from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)

But as soon as I do this I cannot start my worker anymore, I get error from rabbitmq:

amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'

If I start worker without -Q (but leaving Broadcast in celeryconfig.py as described above) and I list rabbitmq queues I can see broadcast queue is created and named like this:

bcast.43fecba7-786a-461c-a322-620039b29b8b

And similarly if I define this queue within worker (using -Q as mentioned above) or as simple Queue in celeryconfig.py like this:

from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)

I can see this queue in rabbitmq like this:

my_queue

It apperas it does not matter what I put into Broadcast call when defining the queue - this seems to be internal celery name, not passed to rabbitmq.

So I'm guessing when worker is starting then my_queue is created and once that's done it cannot be made Broadcast.

I can have a worker that listens to any queue (not only to my_queue) which I would start by removing the -Q argument. But it would be nice to be able to have a single process that only listens to that particular queue since my tasks I throw in there are fast and I'd like to bring latency down as much as possible.

--- Edit 1 --- Spent some time with this problem and it seems bcast queue mentioned above does not appear consistently. After reseting rabbitmq and running celery without -Q option bcast queue did not appear...

Upvotes: 2

Views: 3649

Answers (1)

Chillar Anand
Chillar Anand

Reputation: 29524

When using a broker for sending messages, client and workers must agree on same configuration values. If you have to change config, you need to purge existing messages and restart everything so that they are in sync.

When starting a broadcast queue you can set exchange type and configure the queue.

from kombu.common import Broadcast
from kombu import Exchange


exchange = Exchange('custom_exchange', type='fanout')

CELERY_QUEUES = (
    Broadcast(name='bcast', exchange=exchange),
)

Now you can start worker with

celery worker -l info -A tasks -Q bcast 

Upvotes: 4

Related Questions