Reputation: 1069
I'm trying to start celery worker so it only listens to single queue. This is not a problem, I can do this that way:
python -m celery worker -A my_module -Q my_queue -c 1
But now I also want this my_queue
queue to be a broadcast queue, so I do this in my celeryconfig:
from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)
But as soon as I do this I cannot start my worker anymore, I get error from rabbitmq:
amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'
If I start worker without -Q
(but leaving Broadcast
in celeryconfig.py
as described above) and I list rabbitmq queues I can see broadcast queue is created and named like this:
bcast.43fecba7-786a-461c-a322-620039b29b8b
And similarly if I define this queue within worker (using -Q
as mentioned above) or as simple Queue
in celeryconfig.py
like this:
from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)
I can see this queue in rabbitmq like this:
my_queue
It apperas it does not matter what I put into Broadcast
call when defining the queue - this seems to be internal celery name, not passed to rabbitmq.
So I'm guessing when worker is starting then my_queue
is created and once that's done it cannot be made Broadcast
.
I can have a worker that listens to any queue (not only to my_queue
) which I would start by removing the -Q
argument. But it would be nice to be able to have a single process that only listens to that particular queue since my tasks I throw in there are fast and I'd like to bring latency down as much as possible.
--- Edit 1 ---
Spent some time with this problem and it seems bcast
queue mentioned above does not appear consistently. After reseting rabbitmq and running celery without -Q
option bcast
queue did not appear...
Upvotes: 2
Views: 3649
Reputation: 29524
When using a broker for sending messages, client and workers must agree on same configuration values. If you have to change config, you need to purge existing messages and restart everything so that they are in sync.
When starting a broadcast queue you can set exchange type and configure the queue.
from kombu.common import Broadcast
from kombu import Exchange
exchange = Exchange('custom_exchange', type='fanout')
CELERY_QUEUES = (
Broadcast(name='bcast', exchange=exchange),
)
Now you can start worker with
celery worker -l info -A tasks -Q bcast
Upvotes: 4