freethebees
freethebees

Reputation: 1017

Add and delete Celery periodic tasks at runtime

I've been battling with this task all day.

I have a Django app. I use Celery for asynchronous tasks. Occasionally, I want to create a periodic task. The number of times that tasks will run unknown, but it will need to be deleted later. So the task could be like this:

@shared_task
def foobar_task(id):
    if this_should_run:
        do_task()
    else:
        PeriodicTask.objects.get(name='{} task'.format(id)).delete()

My app is running. I have celery beat running in a Docker container, run using using celery --app=myproject beat --loglevel=info --scheduler=django. I have another container running the standard celery worker.

So now I want to dynamically create my periodic task. I have a view/API endpoint that triggers something like this:

schedule, _ = IntervalSchedule.objects.get_or_create(every=15, period=IntervalSchedule.SECONDS)
PeriodicTask.objects.create(interval=schedule,
                            name='{} task'.format(id),
                            task='myapp.tasks.foobar_task')

In the Django admin, I can see the periodic task has been created. However, watching the logs for both the celery container and celery beat container, nothing happens.

Why is celery beat not picking up that there's a new periodic task? I don't want to have to restart celery beat every time a new task is created or deleted.

Note: I am using Django 1.11.2, PostgreSQL, Celery 4.0.2, Django Celery Beat 1.0.1.

Upvotes: 1

Views: 1385

Answers (1)

freethebees
freethebees

Reputation: 1017

You can create a custom scheduler like the one below, adapted from this answer.

from django_celery_beat.schedulers import DatabaseScheduler

class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = [x for x in new_schedule.keys() if x not in self.schedule.keys()]
                to_remove = [x for x in self.schedule.keys() if x not in new_schedule.keys()]
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()
        return self._schedule

When you run celery beat, point it to this class:

celery --app=myproject beat --loglevel=info --scheduler=myproject.scheduler.AutoUpdateScheduler

Upvotes: 2

Related Questions