Reputation: 5630
I have a Django project with celery
Due to RAM limitations I can only run two worker processes.
I have a mix of 'slow' and 'fast' tasks. Fast tasks shall be executed ASAP. There can be many fast tasks in a short time frame (0.1s - 3s), so ideally both CPUs should handle them.
Slow tasks might run for a few minutes but the result can be delayed.
Slow tasks occur less often, but it can happen that 2 or 3 are queued up at the same time.
My idea was to have one:
celery has by default a task prefetch multiplier ( https://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-prefetch-multiplier ) of 4, which means that 4 fast tasks could be queued behind a slow task and could be delayed by several minutes. Thus I'd like to disable prefetch for worker W2. The doc states:
To disable prefetching, set worker_prefetch_multiplier to 1. Changing that setting to 0 will allow the worker to keep consuming as many messages as it wants.
However what I observe is, that with a prefetch_multiplier of 1 one task is prefetched and would still be delayed by a slow task.
Is this a documentation bug? Is this an implementation bug? Or do I misunderstand the documentation? Is there any way to implement what I want?
The commands, that I execute to start the workers are:
celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 0
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast
my celery settings are default except:
CELERY_BROKER_URL = "pyamqp://*****@localhost:5672/mini"
CELERY_TASK_ROUTES = {
'app1.tasks.task_fast': {"queue": "fast"},
'app1.tasks.task_slow': {"queue": "slow"},
}
my django project's celery.py file is:
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'miniclry.settings')
app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
The __init__.py
of my django project is
from .celery import app as celery_app
__all__ = ('celery_app',)
The code of my workers
import time, logging
from celery import shared_task
from miniclry.celery import app as celery_app
logger = logging.getLogger(__name__)
@shared_task
def task_fast(delay=0.1):
logger.warning("fast in")
time.sleep(delay)
logger.warning("fast out")
@shared_task
def task_slow(delay=30):
logger.warning("slow in")
time.sleep(delay)
logger.warning("slow out")
If I execute following from a management shell I see, that one fast task is only executed after the slow task finished.
from app1.tasks import task_fast, task_slow
task_slow.delay()
for i in range(30):
task_fast.delay()
Can anybody help?
I could post the entire test project if this is considered helpful. Just advise about the recommended SO way of exchanging such kind of projects
Version info:
Upvotes: 1
Views: 2593
Reputation: 909
I confirm the issue, there is a bug in this section of the documentation. worker_prefetch_multiplier = 1
will just as it says, set the worker's prefetch to 1, means worker will hold one more task in addition to one that is executing at the moment.
To actually disable the prefetch you also need to use task_acks_late = True
along with the prefetch setting, see this docs section
Upvotes: 3