Reputation: 1221
I am using Celery to perform asynchronous background tasks, with Redis as the backend. I'm interested in the behaviour of a Celery worker in the following situation:
I am running a worker as a daemon using celeryd
. This worker has been assigned two queues to consume through the -Q
option:
celeryd -E -Q queue1,queue2
How does the worker decide where to fetch the next task to consume from? Does it randomly consume a task from either queue1
or queue2
? Will it prioritise fetching from queue1
because it is first in the list of arguments passed to -Q
?
Upvotes: 40
Views: 17230
Reputation: 907
As of Celery 5.3, the queue priority is to some extent configurable when the Redis transport is used.
It can be adjusted first by changing the queue order strategy, which is a Redis-specific broker transport option.
Its default value is round_robin
which aims to give equal opportunity for every queue to be consumed from. One other available value is priority
. According to the documentation, it will then:
Consume from queues in original order, so that if the first queue always contains messages, the rest of the queues in the list will never be consumed from.
After updating the configuration:
celery_app.conf.update(broker_transport_options={"queue_order_strategy": "priority"})
It is necessary as a second step to start the worker with the correct order of queues where the higher-priority one should be listed first.
celery worker -Q higher_priority_queue,other_queue
It is my understanding that this setup will not stop the currently running tasks from lower-priority queues when there is a new task in a higher-priority queue. It will wait for it to finish. But then it will check the higher-priority queue again. And if it contains a new task, it will run it regardless of whether there are more tasks in other, lower-priority queues or not.
Upvotes: 2
Reputation: 48730
Using the pyamqp broker library pointing at a rabbitmq server, tasks get processed in a round-robin style. See proof below.
It appears that the order tasks are processed in is determined by the broker library, not the actual backend (rabbitmq vs redis is not the issue).
Software versions:
$ pip freeze | egrep "celery|kombu|amqp"
amqp==2.5.2
celery==4.4.2
kombu==4.6.8
from time import sleep
@app.task
def sleepy(name):
print(f"Processing: {name}")
sleep(0.5)
Then in another shell, queue the tasks:
from time import sleep
def queue_them():
for x in range(50):
sleepy.apply_async(args=(f"Q1-T{x}",), queue="Q1")
sleep(0.1)
for x in range(20):
sleepy.apply_async(args=(f"Q2-T{x}",), queue="Q2")
sleep(0.1)
sleepy.apply_async(args=("Q3-T0",), queue="Q3")
for x in range(30):
sleepy.apply_async(args=(f"Q2MOAR-T{x}",), queue="Q2")
# setup - get celery to setup the queues and exchanges
sleepy.apply_async(args=("nothing",), queue="Q1")
sleepy.apply_async(args=("nothing",), queue="Q2")
sleepy.apply_async(args=("nothing",), queue="Q3")
# run the test
queue_them()
And in another shell, run celery:
$ celery worker -A myapp.celery --pool=prefork --concurrency=2 -Ofair --queues=Q1,Q3,Q2
[2020-05-05 21:59:11,547] WARNING [celery.redirected:235] Processing: Q1-T1
[2020-05-05 21:59:11,547] WARNING [celery.redirected:235] Processing: Q1-T0
[2020-05-05 21:59:12,052] WARNING [celery.redirected:235] Processing: Q1-T2
[2020-05-05 21:59:12,053] WARNING [celery.redirected:235] Processing: Q1-T3
[2020-05-05 21:59:12,556] WARNING [celery.redirected:235] Processing: Q1-T5
[2020-05-05 21:59:12,556] WARNING [celery.redirected:235] Processing: Q1-T4
[2020-05-05 21:59:13,062] WARNING [celery.redirected:235] Processing: Q1-T6
[2020-05-05 21:59:13,063] WARNING [celery.redirected:235] Processing: Q1-T7
[2020-05-05 21:59:13,565] WARNING [celery.redirected:235] Processing: Q1-T9
[2020-05-05 21:59:13,565] WARNING [celery.redirected:235] Processing: Q1-T8
[2020-05-05 21:59:14,069] WARNING [celery.redirected:235] Processing: Q1-T10
[2020-05-05 21:59:14,069] WARNING [celery.redirected:235] Processing: Q3-T0
[2020-05-05 21:59:14,571] WARNING [celery.redirected:235] Processing: Q2-T0
[2020-05-05 21:59:14,572] WARNING [celery.redirected:235] Processing: Q2-T1
[2020-05-05 21:59:15,078] WARNING [celery.redirected:235] Processing: Q1-T11
[2020-05-05 21:59:15,078] WARNING [celery.redirected:235] Processing: Q2-T2
[2020-05-05 21:59:15,581] WARNING [celery.redirected:235] Processing: Q2-T3
[2020-05-05 21:59:15,581] WARNING [celery.redirected:235] Processing: Q1-T12
[2020-05-05 21:59:16,084] WARNING [celery.redirected:235] Processing: Q1-T13
[2020-05-05 21:59:16,084] WARNING [celery.redirected:235] Processing: Q2-T4
[2020-05-05 21:59:16,586] WARNING [celery.redirected:235] Processing: Q1-T14
[2020-05-05 21:59:16,586] WARNING [celery.redirected:235] Processing: Q2-T5
[2020-05-05 21:59:17,089] WARNING [celery.redirected:235] Processing: Q1-T15
[2020-05-05 21:59:17,089] WARNING [celery.redirected:235] Processing: Q2-T6
[2020-05-05 21:59:17,591] WARNING [celery.redirected:235] Processing: Q1-T16
[2020-05-05 21:59:17,592] WARNING [celery.redirected:235] Processing: Q2-T7
[2020-05-05 21:59:18,094] WARNING [celery.redirected:235] Processing: Q1-T17
[2020-05-05 21:59:18,094] WARNING [celery.redirected:235] Processing: Q2-T8
[2020-05-05 21:59:18,597] WARNING [celery.redirected:235] Processing: Q1-T18
[2020-05-05 21:59:18,597] WARNING [celery.redirected:235] Processing: Q2-T9
[2020-05-05 21:59:19,102] WARNING [celery.redirected:235] Processing: Q1-T19
[2020-05-05 21:59:19,102] WARNING [celery.redirected:235] Processing: Q1-T20
[2020-05-05 21:59:19,607] WARNING [celery.redirected:235] Processing: Q1-T21
[2020-05-05 21:59:19,607] WARNING [celery.redirected:235] Processing: Q1-T22
[2020-05-05 21:59:20,110] WARNING [celery.redirected:235] Processing: Q1-T23
[2020-05-05 21:59:20,110] WARNING [celery.redirected:235] Processing: Q2-T10
[2020-05-05 21:59:20,614] WARNING [celery.redirected:235] Processing: Q1-T24
[2020-05-05 21:59:20,614] WARNING [celery.redirected:235] Processing: Q2-T11
[2020-05-05 21:59:21,118] WARNING [celery.redirected:235] Processing: Q1-T25
[2020-05-05 21:59:21,118] WARNING [celery.redirected:235] Processing: Q1-T26
[2020-05-05 21:59:21,622] WARNING [celery.redirected:235] Processing: Q2-T12
[2020-05-05 21:59:21,622] WARNING [celery.redirected:235] Processing: Q1-T27
[2020-05-05 21:59:22,124] WARNING [celery.redirected:235] Processing: Q1-T28
[2020-05-05 21:59:22,124] WARNING [celery.redirected:235] Processing: Q2-T13
[2020-05-05 21:59:22,627] WARNING [celery.redirected:235] Processing: Q2-T14
[2020-05-05 21:59:22,627] WARNING [celery.redirected:235] Processing: Q1-T29
[2020-05-05 21:59:23,129] WARNING [celery.redirected:235] Processing: Q1-T31
[2020-05-05 21:59:23,129] WARNING [celery.redirected:235] Processing: Q1-T30
[2020-05-05 21:59:23,631] WARNING [celery.redirected:235] Processing: Q2-T15
[2020-05-05 21:59:23,632] WARNING [celery.redirected:235] Processing: Q1-T32
[2020-05-05 21:59:24,134] WARNING [celery.redirected:235] Processing: Q1-T33
[2020-05-05 21:59:24,134] WARNING [celery.redirected:235] Processing: Q2-T16
[2020-05-05 21:59:24,636] WARNING [celery.redirected:235] Processing: Q2-T17
[2020-05-05 21:59:24,636] WARNING [celery.redirected:235] Processing: Q2-T18
[2020-05-05 21:59:25,138] WARNING [celery.redirected:235] Processing: Q2-T19
[2020-05-05 21:59:25,139] WARNING [celery.redirected:235] Processing: Q1-T34
[2020-05-05 21:59:25,641] WARNING [celery.redirected:235] Processing: Q1-T35
[2020-05-05 21:59:25,642] WARNING [celery.redirected:235] Processing: Q2MOAR-T0
[2020-05-05 21:59:26,144] WARNING [celery.redirected:235] Processing: Q1-T36
[2020-05-05 21:59:26,144] WARNING [celery.redirected:235] Processing: Q1-T37
[2020-05-05 21:59:26,649] WARNING [celery.redirected:235] Processing: Q2MOAR-T1
[2020-05-05 21:59:26,649] WARNING [celery.redirected:235] Processing: Q1-T38
[2020-05-05 21:59:27,153] WARNING [celery.redirected:235] Processing: Q2MOAR-T2
[2020-05-05 21:59:27,154] WARNING [celery.redirected:235] Processing: Q1-T39
[2020-05-05 21:59:27,656] WARNING [celery.redirected:235] Processing: Q2MOAR-T3
[2020-05-05 21:59:27,656] WARNING [celery.redirected:235] Processing: Q2MOAR-T4
[2020-05-05 21:59:28,159] WARNING [celery.redirected:235] Processing: Q2MOAR-T5
[2020-05-05 21:59:28,160] WARNING [celery.redirected:235] Processing: Q1-T40
[2020-05-05 21:59:28,664] WARNING [celery.redirected:235] Processing: Q2MOAR-T6
[2020-05-05 21:59:28,664] WARNING [celery.redirected:235] Processing: Q1-T41
[2020-05-05 21:59:29,167] WARNING [celery.redirected:235] Processing: Q2MOAR-T7
[2020-05-05 21:59:29,167] WARNING [celery.redirected:235] Processing: Q1-T42
And similar results when celery is run with a concurrency of 1:
[2020-05-05 22:01:33,879] WARNING [celery.redirected:235] Processing: Q1-T0
[2020-05-05 22:01:34,385] WARNING [celery.redirected:235] Processing: Q1-T1
[2020-05-05 22:01:34,888] WARNING [celery.redirected:235] Processing: Q1-T2
[2020-05-05 22:01:35,391] WARNING [celery.redirected:235] Processing: Q1-T3
[2020-05-05 22:01:35,894] WARNING [celery.redirected:235] Processing: Q1-T4
[2020-05-05 22:01:36,397] WARNING [celery.redirected:235] Processing: Q1-T5
[2020-05-05 22:01:36,899] WARNING [celery.redirected:235] Processing: Q3-T0
[2020-05-05 22:01:37,404] WARNING [celery.redirected:235] Processing: Q2-T0
[2020-05-05 22:01:37,907] WARNING [celery.redirected:235] Processing: Q2-T1
[2020-05-05 22:01:38,411] WARNING [celery.redirected:235] Processing: Q1-T6
[2020-05-05 22:01:38,913] WARNING [celery.redirected:235] Processing: Q2-T2
[2020-05-05 22:01:39,417] WARNING [celery.redirected:235] Processing: Q2-T3
[2020-05-05 22:01:39,919] WARNING [celery.redirected:235] Processing: Q2-T4
[2020-05-05 22:01:40,422] WARNING [celery.redirected:235] Processing: Q1-T7
[2020-05-05 22:01:40,925] WARNING [celery.redirected:235] Processing: Q2-T5
[2020-05-05 22:01:41,429] WARNING [celery.redirected:235] Processing: Q1-T8
Upvotes: 3
Reputation: 1620
NOTE: This answer has deprecated: latest version of Celery works very differently then what it was in 2013...
A worker consuming several queues consumes task, FIFO order is maintained across multiple queues too.
Example:
Queue1 : (t1, t2, t5, t7)
Queue2 : (t0,t3,t4,t6)
Assuming 0-7 represents the order of the tasks published
order of Consumption is t0, t1, t2, t3, t4, t5, t6, t7
Upvotes: 9
Reputation: 21892
From my testing, it processes multiple queues round-robin style.
If I use this test code:
from celery import task
import time
@task
def my_task(item_id):
time.sleep(0.5)
print('Processing item "%s"...' % item_id)
def add_items_to_queue(queue_name, items_count):
for i in xrange(0, items_count):
my_task.apply_async(('%s-%d' % (queue_name, i),), queue=queue_name)
add_items_to_queue('queue1', 10)
add_items_to_queue('queue2', 10)
add_items_to_queue('queue3', 5)
And start the queue with (using django-celery):
`manage.py celery worker -Q queue1,queue2,queue3`
It outputs:
Processing item "queue1-0"...
Processing item "queue3-0"...
Processing item "queue2-0"...
Processing item "queue1-1"...
Processing item "queue3-1"...
Processing item "queue2-1"...
Processing item "queue1-2"...
Processing item "queue3-2"...
Processing item "queue2-2"...
Processing item "queue1-3"...
Processing item "queue3-3"...
Processing item "queue2-3"...
Processing item "queue1-4"...
Processing item "queue3-4"...
Processing item "queue2-4"...
Processing item "queue1-5"...
Processing item "queue2-5"...
Processing item "queue1-6"...
Processing item "queue2-6"...
Processing item "queue1-7"...
Processing item "queue2-7"...
Processing item "queue1-8"...
Processing item "queue2-8"...
Processing item "queue1-9"...
Processing item "queue2-9"...
So it pulls one item from each queue before going on to the next queue1 item even though ALL of the queue1 tasks were published before the queue2 & 3 tasks.
Note: As @WarLord pointed out, this exact behavior will only work when CELERYD_PREFETCH_MULTIPLIER
is set to 1. If it's greater than 1, then that means items will be fetched from the queue in batches. So if you have 4 processes with the PREFETCH_MULTIPLIER set to 4, that means there will be 16 items pulled from the queue right off the bat, so you won't get the exact output as above, but it will still roughly follow round-robin.
Upvotes: 22