Praful Bagai
Praful Bagai

Reputation: 17392

Create celery multiple queues

I've a configuration file which contains a category list

cat_link = {'cat1':[link1,link2....],'cat2':[link3,link4....],'cat3':[link5,link6....],'cat4':[link7,link8....]}

I want to create queues as per the total number of categories defined in configuration file, also when I process the links for a particular category , each queue should process its own set of links.

The task for processing the links is same. I just want is that every category links should be processed in their specific queue.

It should be something like this:-

for category, link in cat_link.iteritems():
    process_link.apply_async(args=[link],
                             queue=category,)

How should I create dynamic queues, keeping in mind that any category can be removed/added in future?

How should I my celeryconfig look like? Currently it is as below :-

BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Oslo'
CELERY_ENABLE_UTC = True

#Should I read my `cat_link` config setting in a loop and then create the queues??

I've seen different queues for different tasks, but is it possible different queues for same task?

Upvotes: 1

Views: 3104

Answers (1)

itzMEonTV
itzMEonTV

Reputation: 20359

If you want different queue dynamically,

process_link.apply_async(args=[link1],
                         queue=queue1)

process_link.apply_async(args=[link2],
                         queue=queue2)

Also you have to insert following in configuration file

CELERY_CREATE_MISSING_QUEUES = True

But one thing you have to care about is when starting worker, you have to pass -Q 'queue name' as argument

For ex

celery -A proj worker -l info -Q queue1,queue2

for consuming from that queue 'queuename'

Upvotes: 6

Related Questions