Reputation: 711
So I have two worker servers (celeryd) that use two virtual hosts in the same rabbitmq server. I pass in a variable to my messagecenter.py file in order to specify which broker url it should use to publish the task. After debugging for a while I found out that once celery establishes a connection with a broker url - it does not disconnect from the previous vhost and connect to the new one. I am not sure exactly how exchanges and bindings work in celery. So any help is highly appreciated.
messagecenter.py/
from celery import Celery
MESSAGE_SETTINGS = {
"WORKER1": "amqp://user1:pass1@server-name:5672/vhost1",
"WORKER2": "amqp://user2:pass2@server-name:5672/vhost2"
}
class MessageCenter(object):
def __init__(self, config):
self._config = config
self._celery = Celery()
def produce_task(self, name, uuid, params):
self._celery.conf.update(
BROKER_URL = self._config[name])
self._celery.send_task(uuid, params)
And I pass in either 'WORKER1' or 'WORKER2' along with the necessary params to publish the task i.e. send_task. I am hoping there is a way I can correctly route my task to the different servers. Any help will be highly appreciated.
Upvotes: 3
Views: 1295
Reputation: 711
Found the solution here
So I changed my celeryconfig on the worker servers to include two exchanges (each with its own set of bindings and routing_keys). And then I passed those in as params from my producer (i.e. send_task('task_name', [params], queue = 'myQueue', routing_key = 'myRoutingKey'))
just for reference, here is how my celeryconfig on one of the servers look like:
BROKER_URL = "amqp://server_URL/cel_host"
CELERY_APP = 'proj.tasks'
CELERY_IMPORTS = ('proj.tasks', )
CELERY_ROUTES = {
'proj.tasks.send_email': {
'queue': 'email_tasks',
'routing_key': 'email.import',
},
}
Upvotes: 1