Reputation: 5948
I have a python file called tasks.py
in which I am defining 4 single tasks. I would like to configure celery in order to use 4 queues because each queue would have a different number of workers assigned. I was reading I should use route_task property but I tried several options and not a success.
I was following this doc celery route_tasks docs
My goal would be run 4 workers, one for each task, and don't mix tasks from different workers in different queues. It's possible? It's a good approach?
If I am doing something wrong I would be happy to change my code to make it work
Here is my config so far
tasks.py
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('queueA', routing_key='tasks.task_1'),
Queue('queueB', routing_key='tasks.task_2'),
Queue('queueC', routing_key='tasks.task_3'),
Queue('queueD', routing_key='tasks.task_4')
)
@app.task
def task_1():
print "Task of level 1"
@app.task
def task_2():
print "Task of level 2"
@app.task
def task_3():
print "Task of level 3"
@app.task
def task_4():
print "Task of level 4"
Run celery one worker for each queue
celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
celery -A tasks worker --loglevel=debug -Q queueC --logfile=celery-C.log -n W3&
celery -A tasks worker --loglevel=debug -Q queueD --logfile=celery-D.log -n W4&
Upvotes: 4
Views: 8541
Reputation: 2673
Note: task
and message
are used interchangeably in the answer. It is basically a payload that the producer
sends to RabbitMQ
You can either follow approach suggested by Chillar or you can define and use the task_routes
configuration to route the messages to appropriate queue. This way you don't need to specify queue name every time you call apply_async
.
Example: Route task1 to QueueA
and route task2 to QueueB
app = Celery('my_app')
app.conf.update(
task_routes={
'task1': {'queue': 'QueueA'},
'task2': {'queue': 'QueueB'}
}
)
Sending a task to multiple queue is a bit tricky. You will have to declare an exchange, and then route your task with appropriate routing_key
. You can get more information about type of exchange here. Let's go with direct
for purpose of illustration.
Create Exchange
from kombu import Exchange, Queue, binding
exchange_for_queueA_and_B = Exchange('exchange_for_queueA_and_B', type='direct')
Create bindings on Queues to that exchange
app.conf.update(
task_queues=(
Queue('QueueA', [
binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b')
]),
Queue('QueueB', [
binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b')
])
)
)
Define the task_route
to send task1 to the exchange
app.conf.update(
task_routes={
'task1': {'exchange': 'exchange_for_queueA_and_B', 'routing_key': 'queue_a_and_b'}
}
)
You can also declare these options of exchange
and routing_key
in your apply_async
method as suggested by Chillar in the above answer.
After that, you can define your workers on same machine or different machines, to consume from those queues.
celery -A my_app worker -n consume_from_QueueA_and_QueueB -Q QueueA,QueueB
celery -A my_app worker -n consume_from_QueueA_only -Q QueueA
Upvotes: 1
Reputation: 29594
There is no need to get into complex routing for submitting tasks into different queues. Define your tasks as usual.
from celery import celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def task_1():
print "Task of level 1"
@app.task
def task_2():
print "Task of level 2"
Now while queuing the tasks, put the tasks in proper queue. Here is an example on how to do it.
In [12]: from tasks import *
In [14]: result = task_1.apply_async(queue='queueA')
In [15]: result = task_2.apply_async(queue='queueB')
This will put the task_1
in queue named queueA
and task_2
in queueB
.
Now you can just start your workers to consume them.
celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
Upvotes: 10