pt12lol
pt12lol

Reputation: 2441

Limit tasks number queued on Celery worker

I have the most simple example of Celery task (case.py):

import time
import celery


app = celery.Celery('case', broker='redis://localhost')

@app.task
def do_sth():
    time.sleep(5)

And I create a few tasks instances:

>>> import case
>>> tasks_list = [case.do_sth.delay() for i in range(4)]

Then I would like to have dynamic number of workers with --concurrency=1 parameter. This is crucial for my case to have only one parallel task for every worker and adding workers when some tasks are queued on server. If some tasks do not have their workers and a new worker is added, the new worker should take care of queued (pending but not ongoing) tasks. But when I call $ celery -A case worker --loglevel=info --concurrency=1 I have the following logs (excluding Celery logo):

[tasks]
  . case.do_sth

[2017-02-21 10:03:18,454: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:18,459: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:19,471: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:19,480: INFO/MainProcess] celery@pt0 ready.
[2017-02-21 10:03:19,658: INFO/MainProcess] Received task: case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c]
[2017-02-21 10:03:19,660: INFO/MainProcess] Received task: case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b]
[2017-02-21 10:03:19,662: INFO/MainProcess] Received task: case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803]
[2017-02-21 10:03:19,664: INFO/MainProcess] Received task: case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d]
[2017-02-21 10:03:24,667: INFO/PoolWorker-1] Task case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c] succeeded in 5.006301835s: None
[2017-02-21 10:03:29,675: INFO/PoolWorker-1] Task case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b] succeeded in 5.005384011s: None
[2017-02-21 10:03:34,683: INFO/PoolWorker-1] Task case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803] succeeded in 5.005373027s: None
[2017-02-21 10:03:39,690: INFO/PoolWorker-1] Task case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d] succeeded in 5.00531687s: None

Meanwhile I started another worker (using the same command) but it did nothing:

[tasks]
  . case.do_sth

[2017-02-21 10:03:20,321: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:20,326: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:21,339: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:21,352: INFO/MainProcess] celery@pt0 ready.

If you check dates you can see that the second worker has been triggered less than five seconds after the first one.

Is there a way (some worker or Celery option) to limit tasks queued on a single worker to exactly one task?

Upvotes: 4

Views: 6265

Answers (1)

mic4ael
mic4ael

Reputation: 8300

I think that the thing you are looking for is called prefetch limit. It effectively reduces number of tasks a worker can reserve at a given time.

The prefetch limit is a limit for the number of tasks (messages) a worker can reserve for itself

Upvotes: 4

Related Questions