Jasper van den Bosch
Jasper van den Bosch

Reputation: 3218

Celery dynamic tasks / hiding Celery implementation behind an interface

I am trying to figure out how to implement my asynchronous jobs with Celery, without tying them to the Celery implementation.

If I have an interface that accepts objects to schedule, such as callables (Or an object that wraps a callable):

ITaskManager(Interface):
    def schedule(task):
        #eventually run task

And I might implement it with the treading module:

ThreadingTaskManager(object)
    def schedule(task):
        Thread(task).start() # or similar

But it seems this couldn't be done with celery, am I right?

Upvotes: 4

Views: 792

Answers (1)

Jasper van den Bosch
Jasper van den Bosch

Reputation: 3218

Perhaps one, albeit quite ugly, solution might be to define one celery task which dynamically loads the task object that is passed as an argument:

@celery.task
def taskrunner(taskname):
    taskModule = __import__(taskname)
    taskModule.run()

CeleryTaskManager(object)
    def schedule(task):
        taskrunner.delay(task.__file__)


from mytask import run

CeleryTaskManager().schedule(run)

Upvotes: 3

Related Questions