Reputation: 18745
I want to create a multiple queues for different tasks. For example emailqueue
to sending emails or pipedrive
queue to sync tasks with pipedrive API
so email
does not have to wait until all pipedrives
are synced and vice versa.
I'm new in routing and I tried two approaches to create queues but none of them seemes to work.
This is a preferred approach. I tried to define queue inside @task
decorator
@task(bind=True, queue='pipedrivequeue')
def backsync_lead(self,lead_id):
settings.py
CELERY_ROUTES = { # tried CELERY_TASK_ROUTES too
'pipedrive.tasks.*': {'queue': 'pipedrivequeue'},
...
}
In both cases, when I run celery worker
manually, I see only one default celery
queue.
(project) milano@milano-PC:~/PycharmProjects/project$ celery -A project.celery worker -l info
-------------- celery@milano-PC v4.2.2 (windowlicker)
---- **** -----
--- * *** * -- Linux-4.15.0-47-generic-x86_64-with-Ubuntu-18.04-bionic 2019-04-12 17:17:05
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: project:0x7f3b47f66cf8
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. project.apps.apis.pipedrive.tasks.backsync_all_stages
. project.apps.apis.pipedrive.tasks.backsync_lead
As you can see in this line:
-------------- [queues]
.> celery exchange=celery(direct) key=celery
There is probably just one queue. I want to use this queue only for tasks without queue specified.
Do you know where is the problem?
EDIT
(project) milano@milano-PC:~/PycharmProjects/peoject$ celery inspect active_queues
Error: No nodes replied within time constraint.
Upvotes: 6
Views: 11247
Reputation: 12869
You need to run a worker with the queue named explicitly, then django will be able to feed into that queue;
celery worker -A project.celery -l info # Default queue worker
celery worker -A project.celery -l info -Q pipedrivequeue # pipedrivequeue worker
celery worker -A project.celery -l info -Q testqueue # testqueue worker
Upvotes: 3