user620316
user620316

Reputation: 445

In Celery, how can I keep long-delayed tasks from blocking newer ones?

I have two kinds of tasks. Task A is generated by celerybeat every hour. It runs immediately, and generates a thousand (or many thousand) instances of Task B, each of which has an ETA of one day in the future.

Upon startup, an instance of Task A runs and generates a thousand Bs. And from then on, nothing happens. I should see another A running each hour, with another thousand Bs. But in fact I see nothing.

At the freeze, rabbitmqctl shows 1000 messages, with 968 ready and 32 unacknowledged. An hour later there are 1001 messages, 969 ready and 32 unacknowledged. And so forth, every hour one new message classified as ready. Presumably what's happening is that the worker is prefetching 32 messages but can't act on them, because their ETA is still in the future. Meantime, newer tasks that should run right now can't be run.

What is the right way to handle this? I'm guessing that I need multiple workers, and maybe multiple queues (but I'm not sure of the latter point). Is there a simpler way? I've tried fiddling with CELERYD_PREFETCH_MULTIPLIER and -Ofail (as discussed here: http://celery.readthedocs.org/en/latest/userguide/optimizing.html) but can't get it to go. Is my question the same as this one: [[Django Celery]] Celery blocked doing IO tasks ?

In any case: I can address this issue only because I know a lot about the nature of the tasks and their timing. Doesn't it seem a design flaw that enough tasks with future ETA can lock up the whole system? If I wait a few hours, then kill and restart the worker, it once again grabs the first 32 tasks and freezes up, even though at this point there are tasks in the queue that are ready to run right now. Shouldn't some component be smart enough to look at ETAs and ignore tasks that aren't runnable?

ADDENDUM: I now think that the problem is a known bug when RabbitMQ 3.3 is used with Celery 3.1.0. More information here: https://groups.google.com/forum/#!searchin/celery-users/countdown|sort:date/celery-users/FiAAESOzezA/499OH-pylacJ

After updating to Celery 3.1.1, things seem better. Task A runs hourly (well, it has for a couple of hours) and schedules its copies of Task B. Those seem to be filling up the worker: The number of unacknowledged messages continues to grow. I'll have to see if it can grow without bound.

Upvotes: 13

Views: 6539

Answers (2)

Prat
Prat

Reputation: 505

How many workers do you have running currently with what concurrency?

Increasing concurrency on your workers might help the problem. If thread x is stuck on task A which is taking a long amount of time or in a wait state, another thread could work on the other prefetched tasks

http://celery.readthedocs.io/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-c

Upvotes: 1

Kyle Owens
Kyle Owens

Reputation: 1361

It seems like this is an issue that can be solved with routing: http://celery.readthedocs.org/en/latest/userguide/routing.html

When using routing, you can have multiple queues that are populated with different types of tasks. If you want task B to not block more task A, you could make them into separate worker queues with different priority such that your workers will work on the large queue full of Task Bs but when an Task A arrive it is pulled by the next available worker.

The added benefit of this is that you can also assign more workers to heavily filled queues and those workers will only pull from the designated high volume queue.

Upvotes: 2

Related Questions