cralfaro
cralfaro

Reputation: 5948

celery one broker multiple queues and workers

I have a python file called tasks.py in which I am defining 4 single tasks. I would like to configure celery in order to use 4 queues because each queue would have a different number of workers assigned. I was reading I should use route_task property but I tried several options and not a success.

I was following this doc celery route_tasks docs

My goal would be run 4 workers, one for each task, and don't mix tasks from different workers in different queues. It's possible? It's a good approach?

If I am doing something wrong I would be happy to change my code to make it work

Here is my config so far

tasks.py

app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('queueA',    routing_key='tasks.task_1'),
    Queue('queueB',    routing_key='tasks.task_2'),
    Queue('queueC',    routing_key='tasks.task_3'),
    Queue('queueD',    routing_key='tasks.task_4')
)


@app.task
def task_1():
    print "Task of level 1"


@app.task
def task_2():
    print "Task of level 2"


@app.task
def task_3():
    print "Task of level 3"


@app.task
def task_4():
    print "Task of level 4"

Run celery one worker for each queue

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
celery -A tasks worker --loglevel=debug -Q queueC --logfile=celery-C.log -n W3&
celery -A tasks worker --loglevel=debug -Q queueD --logfile=celery-D.log -n W4&

Upvotes: 4

Views: 8541

Answers (2)

Abhishek
Abhishek

Reputation: 2673

Note: task and message are used interchangeably in the answer. It is basically a payload that the producer sends to RabbitMQ

You can either follow approach suggested by Chillar or you can define and use the task_routes configuration to route the messages to appropriate queue. This way you don't need to specify queue name every time you call apply_async.

Example: Route task1 to QueueA and route task2 to QueueB

app = Celery('my_app')
app.conf.update(
    task_routes={
        'task1': {'queue': 'QueueA'},
        'task2': {'queue': 'QueueB'}
    }
)

Sending a task to multiple queue is a bit tricky. You will have to declare an exchange, and then route your task with appropriate routing_key. You can get more information about type of exchange here. Let's go with direct for purpose of illustration.

  1. Create Exchange

    from kombu import Exchange, Queue, binding
    exchange_for_queueA_and_B = Exchange('exchange_for_queueA_and_B', type='direct')
    
  2. Create bindings on Queues to that exchange

    app.conf.update(
        task_queues=(
            Queue('QueueA', [
                binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b')
            ]),
            Queue('QueueB', [
                binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b')
            ])
        )
    )
    
  3. Define the task_route to send task1 to the exchange

    app.conf.update(
        task_routes={
            'task1': {'exchange': 'exchange_for_queueA_and_B', 'routing_key': 'queue_a_and_b'}
        }
    )
    

You can also declare these options of exchange and routing_key in your apply_async method as suggested by Chillar in the above answer.

After that, you can define your workers on same machine or different machines, to consume from those queues.

celery -A my_app worker -n consume_from_QueueA_and_QueueB -Q QueueA,QueueB
celery -A my_app worker -n consume_from_QueueA_only -Q QueueA

Upvotes: 1

Chillar Anand
Chillar Anand

Reputation: 29594

There is no need to get into complex routing for submitting tasks into different queues. Define your tasks as usual.

from celery import celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def task_1():
    print "Task of level 1"


@app.task
def task_2():
    print "Task of level 2"

Now while queuing the tasks, put the tasks in proper queue. Here is an example on how to do it.

In [12]: from tasks import *

In [14]: result = task_1.apply_async(queue='queueA')

In [15]: result = task_2.apply_async(queue='queueB')

This will put the task_1 in queue named queueA and task_2 in queueB.

Now you can just start your workers to consume them.

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&

Upvotes: 10

Related Questions