ecline6
ecline6

Reputation: 896

Turn Off Celery Tasks

I'm trying to find a way to be able to turn celery tasks on/off from the django admin. This is mostly to disable tasks that call external services when those services are down or have a scheduled maintenance period.

For my periodic tasks, this is easy, especially with django-celery. But for tasks that are called on demand I'm having some trouble. Currently I'm exploring just storing an on/off status for various tasks in a TaskControl model and then just checking that status at the beginning of task execution, returning None if the status is False. This makes me feel dirty due to all the extra db lookups every time a task kicks off. I could use a cache backend that isn't the db, but it seems a little overkill to add caching just for these handful of key/value pairs.

in models.py

# this is a singleton model. singleton code bits omitted for brevity.
class TaskControl(models.Model):
    some_status = models.BooleanField(default=True)
    # more statuses

in tasks.py

@celery.task(ignore_result=True)
def some_task():
    task_control = TaskControl.objects.get(pk=1)
    if not task_control.some_status:
        return None

    # otherwise execute task as normal

What is a better way to do this?

Upvotes: 2

Views: 5179

Answers (1)

Vasiliy Faronov
Vasiliy Faronov

Reputation: 12310

Option 1. Try your simple approach. See if it affect performance. If not, lose the “dirty” feeling.

Option 2. Cache in process memory with a singleton. Add freshness information to your TaskControl model:

class TaskControl(models.Model):
    some_status = models.BooleanField(default=True)
    # more statuses
    expires = models.DateTimeField()
    check_interval = models.IntegerField(default=5 * 60)

    def is_stale(self):
        return (
            (datetime.utcnow() >= self.expires) or
            ((datetime.utcnow() - self.retrieved).total_seconds >= self.check_interval))

Then in a task_ctl.py:

_control = None

def is_enabled():
    global _control
    if (_control is None) or _control.is_stale():
        _control = TaskControl.objects.get(pk=1)
        # There's probably a better way to set `retrieved`,
        # maybe with a signal or a `Model` override,
        # but this should work.
        _control.retrieved = datetime.utcnow()
    return _control.some_status

Option 3. Like option 2, but instead of time-based expiration, use Celery’s remote control to force all workers to reload the TaskControl (you’ll have to write your own control command, and I don’t know if all the internals you will need are public APIs).

Option 4, only applicable if all your Celery workers run on a single machine. Store the on/off flag as a file on that machine’s file system. Query its existence with os.path.exists (that should be a single stat() or something, cheap enough). If the workers and the admin panel are on different machines, use a special Celery task to create/remove the file.

Option 5. Like option 4, but on the admin/web machine: if the file exists, just don’t queue the task in the first place.

Upvotes: 2

Related Questions