fabriciols
fabriciols

Reputation: 989

celery - Tasks that need to run in priority

In my website users can UPDATE they profile (manual) every time he want, or automatic once a day.

This task is being distributed with celery now.

But i have a "problem" :

Every day, in automatic update, a job put ALL users (+-6k users) on queue:

from celery import group
from tasks import *
import datetime
from lastActivityDate.models import UserActivity

today   = datetime.datetime.today()
one_day = datetime.timedelta(days=5)
today -= one_day

print datetime.datetime.today()

user_list = UserActivity.objects.filter(last_activity_date__gte=today)
g = group(update_user_profile.s(i.user.auth.username) for i in user_list)

print datetime.datetime.today()
print g(user_list.count()).get()

If someone try to do the manual update, they will enter on te queue and last forever to be executed.

Is there a way to set this manual task to run in a piority way? Or make a dedicated for each separated queue: manual and automatic?

Upvotes: 36

Views: 39622

Answers (2)

Andrey St
Andrey St

Reputation: 488

If you use RabbitMQ transport then configure your queues the following way: settings.py

from kombu import Queue
...
CELERY_TASK_QUEUES = (
    Queue('default', routing_key='task_default.#', max_priority=10), 
    ...)

Then run your tasks:

my_low_prio_task.apply_async(args=(...), priority=1)
my_high_prio_task.apply_async(args=(...), priority=10)

Presently this code works for kombu==4.6.11, celery==4.4.6.

Upvotes: 28

Satoshi Yoshinaga
Satoshi Yoshinaga

Reputation: 694

Celery does not support task priority. (v3.0)

http://docs.celeryproject.org/en/master/faq.html#does-celery-support-task-priorities

You may solve this problem by routing tasks.

http://docs.celeryproject.org/en/latest/userguide/routing.html

Prepare default and priority_high Queue.

from kombu import Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default'),
    Queue('priority_high'),
)

Run two daemon.

user@x:/$ celery worker -Q priority_high
user@y:/$ celery worker -Q default,priority_high

And route task.

your_task.apply_async(args=['...'], queue='priority_high')

Upvotes: 49

Related Questions