Milano
Milano

Reputation: 18745

Celery - how to use multiple queues?

I want to create a multiple queues for different tasks. For example emailqueue to sending emails or pipedrive queue to sync tasks with pipedrive API so email does not have to wait until all pipedrives are synced and vice versa.

I'm new in routing and I tried two approaches to create queues but none of them seemes to work.

  1. This is a preferred approach. I tried to define queue inside @task decorator

    @task(bind=True,  queue='pipedrivequeue')
    

    def backsync_lead(self,lead_id):

  2. settings.py

    CELERY_ROUTES = { # tried CELERY_TASK_ROUTES too
        'pipedrive.tasks.*': {'queue': 'pipedrivequeue'},
       ...
    }
    

In both cases, when I run celery worker manually, I see only one default celery queue.

(project) milano@milano-PC:~/PycharmProjects/project$ celery -A project.celery worker -l info

 -------------- celery@milano-PC v4.2.2 (windowlicker)
---- **** ----- 
--- * ***  * -- Linux-4.15.0-47-generic-x86_64-with-Ubuntu-18.04-bionic 2019-04-12 17:17:05
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         project:0x7f3b47f66cf8
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . project.apps.apis.pipedrive.tasks.backsync_all_stages
  . project.apps.apis.pipedrive.tasks.backsync_lead

As you can see in this line:

 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

There is probably just one queue. I want to use this queue only for tasks without queue specified.

Do you know where is the problem?

EDIT

   (project) milano@milano-PC:~/PycharmProjects/peoject$ celery inspect active_queues
Error: No nodes replied within time constraint.

Upvotes: 6

Views: 11247

Answers (1)

markwalker_
markwalker_

Reputation: 12869

You need to run a worker with the queue named explicitly, then django will be able to feed into that queue;

celery worker -A project.celery -l info  # Default queue worker
celery worker -A project.celery -l info -Q pipedrivequeue  # pipedrivequeue worker
celery worker -A project.celery -l info -Q testqueue  # testqueue worker

Upvotes: 3

Related Questions