Daviddd
Daviddd

Reputation: 791

celery 4.0.0 and Class based task workflow

Based on these examples:

https://blog.balthazar-rouberol.com/celery-best-practices

https://shulhi.com/2015/10/13/class-based-celery-task/

Class based Task in django celery

I would like to create something similar:

class CalculationWorker(Task):

    def __init__(self, id_user):
        self._id_user = id_user
        self._user = get_object_or_404(User, pk=self._id_user)

    # I need to understand if the bind work or if it's needed
    def _bind(self, app):
        return super(self.__class__, self).bind(celery_app)

    def _retrieve_some_task(self):
        # long calculation

    def _long_run_task(self):
        # long calculation
        self._retrieve_some_task()

    # Main entry
    def run(self):
        self._long_run_task()


# 
def run_job():  
    worker = CalculationWorker(id_user=323232)
    task = worker.apply_async()

The documentation seems to say it's possible (anyway it's not clear to me) http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes. It even says:

""" This means that the init constructor will only be called once per process, and that the task class is semantically closer to an Actor. ""

but http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#the-task-base-class-no-longer-automatically-register-tasks explictly says: "The best practice is to use custom task classes only for overriding general behavior, and then using the task decorator to realize the task".

As result I got a NotRegistered Exception due to this https://github.com/celery/celery/issues/3548, but adding app.tasks.register(CalculationWorker()) didn't solve it. I'm using Django 1.10.X and Celery 4.0.0

Is that approach still valid?

Thanks

Upvotes: 3

Views: 4996

Answers (2)

vvkuznetsov
vvkuznetsov

Reputation: 1133

If you use celery-4.0.1, then you should check documentation that chris 1 pointed out docs

The Task class is no longer using a special meta-class that automatically registers the task in the task registry.

Now you should register your task like this

class CustomTask(Task):
    def run(self):
        print('running')
app.register_task(CustomTask())

Upvotes: 9

shadow chris
shadow chris

Reputation: 391

I'm not sure if this is the best solution, but you can use a workaround to get the behavior you want. I'm doing something similar so that I can stick an error handler on the task in a clearer way.

From the docs 1

The best practice is to use custom task classes only for overriding general behavior, and then using the task decorator to realize the task:

@app.task(bind=True, base=CustomTask) 
def custom(self):
    print('running')

But you can put all of your task code inside CustomTask and just leave a stub in the decorated task declaration. You do have to call into the superclass of your task in the stub like so:

@app.task(bin=True, base=CustomTask)
def custom(self, *args):
    super(type(self), self).run(*args)

You then treat the decorated function declaration as a way to call into celery's task registration machinery. I hope something cleaner comes out in 5.0 though.

Upvotes: 8

Related Questions