user2216194
user2216194

Reputation: 711

Celery routing tasks to wrong worker server

So I have two worker servers (celeryd) that use two virtual hosts in the same rabbitmq server. I pass in a variable to my messagecenter.py file in order to specify which broker url it should use to publish the task. After debugging for a while I found out that once celery establishes a connection with a broker url - it does not disconnect from the previous vhost and connect to the new one. I am not sure exactly how exchanges and bindings work in celery. So any help is highly appreciated.

messagecenter.py/

from celery import Celery

MESSAGE_SETTINGS = {
    "WORKER1": "amqp://user1:pass1@server-name:5672/vhost1",
    "WORKER2": "amqp://user2:pass2@server-name:5672/vhost2"
}

class MessageCenter(object):
    def __init__(self, config):
        self._config = config
        self._celery = Celery()

    def produce_task(self, name, uuid, params):
        self._celery.conf.update(
            BROKER_URL = self._config[name])
        self._celery.send_task(uuid, params) 

And I pass in either 'WORKER1' or 'WORKER2' along with the necessary params to publish the task i.e. send_task. I am hoping there is a way I can correctly route my task to the different servers. Any help will be highly appreciated.

Upvotes: 3

Views: 1295

Answers (1)

user2216194
user2216194

Reputation: 711

Found the solution here

So I changed my celeryconfig on the worker servers to include two exchanges (each with its own set of bindings and routing_keys). And then I passed those in as params from my producer (i.e. send_task('task_name', [params], queue = 'myQueue', routing_key = 'myRoutingKey'))

just for reference, here is how my celeryconfig on one of the servers look like:

BROKER_URL = "amqp://server_URL/cel_host"

CELERY_APP = 'proj.tasks'

CELERY_IMPORTS = ('proj.tasks', )

CELERY_ROUTES = {
        'proj.tasks.send_email': {
            'queue': 'email_tasks',
            'routing_key': 'email.import',
        },
}

Upvotes: 1

Related Questions