Suresh
Suresh

Reputation: 495

Celery Beat - How to invoke a periodic task manually?

In our production environment, we have a Celery beat configured to perform periodic tasks according to the frequency.

My question is, is there a way for me to start the task that was meant to run at specific time right now?

Any advice or guidance is greatly appreciated.

I tried to add the periodic task using app.add_periodic_task from python interpreter, but it didn't work.

I am looking for some help to run any specific scheduled task at that moment either from Python interpreter or celery command line.

Upvotes: 0

Views: 1189

Answers (2)

Suresh
Suresh

Reputation: 495

I accomplished this through following two scenarios.

  1. Connect to the related docker instance container where the Celery beat is hosted and run the below commands in Python interpreter
>>> from mymodule.celerybeat import cleanup
>>> cleanup.apply_async()
<AsyncResult: afd71a7f-2246-4946-8572-267f3494df31>
  1. Using the Celery command line through subcommand call
celery -A myceleryapp:app call mymodule.celerybeat.cleanup

Note: Here myceleryapp is the app name and cleanup is the periodic task.

Upvotes: 1

Molossus
Molossus

Reputation: 519

I was facing the same problem. I am using Robot Framework to test our Flask application and I needed to manually trigger a periodic task as part of a test case. To do this I created an endpoint that could take a Redis key as a parameter. This fetched the task definition from Redis and used this to fetch the Celery task which could be then executed with the arguments from the task definition as follows:

    def post(self, job_id):

        """Run a new periodic task"""

        task: PeriodicTaskSchedule = PeriodicTaskSchedule.query.filter_by(id=job_id).first()

        if not task:
            abort(HTTPStatus.NOT_FOUND, "Periodic task not found")

        key = f"{celery.conf.redbeat_key_prefix}{job_id}"
        entry = RedBeatSchedulerEntry.from_key(key, app=celery)
        task_function = celery.tasks[entry.task]
        task_function.apply_async(args=entry.args, kwargs=entry.kwargs)

        fresh_jwt = refresh_access_token(request)
        return ({"ran": job_id, "task name": entry.kwargs["name"]}, 
                HTTPStatus.OK, {"x-token": fresh_jwt})

Upvotes: 0

Related Questions