Grady Woodruff
Grady Woodruff

Reputation: 143

Where to call a celery task on model save

I need to call a celery task when saving a model. I have conflicting imports that I do not know how to resolve. I am wondering if anyone knows another way I can structure this to avoid conflicting imports

models.py
from .tasks import celery_task

class Picture(PolymorphicModel):
    file = models.ImageField()
    processed_file = models.ImageField(blank=True, null=True)
    ...
    def save(self, *args, **kwargs):
        if self.file:
            self.processed_file = celery_task.delay(self.id, other_arg)
        super(Picture, self).save(*args, **kwargs)




tasks.py
from .models import Picture

@task
def celery_task(id, other_arg):
    try:
        picture = Picture.objects.get(id=id)
    except ObjectDoesNotExist:
        picture = None

    if picture:
        return some_other_function(picture.file)

    return None

Upvotes: 0

Views: 4203

Answers (2)

jaume07
jaume07

Reputation: 51

To complement the answer of 2ps, with this code structure you will run into database race conditions. I have found specially useful this article to solve them https://www.vinta.com.br/blog/2016/database-concurrency-in-django-the-right-way/

Data races conditions happen when two or more concurrent threads try to access the same memory address (or in this case, some specific data in a database) at the same time.

That means that the Django app and the Celery app threads try to access to the Picture instance at the same time. The article points out three methods to solve that, but the one that has worked for me is the use of transaction.on_commit(lambda: your_celery_task.delay())

In your case, that would be:

models.py
from .tasks import celery_task
from django.db import transaction

class Picture(PolymorphicModel):
    file = models.ImageField()
    processed_file = models.ImageField(blank=True, null=True)
    ...
    def save(self, *args, **kwargs):
        super(Picture, self).save(*args, **kwargs)
        if self.file:
            transaction.on_commit(lambda: celery_task.delay(self.id))

Upvotes: 5

2ps
2ps

Reputation: 15936

Note that there is an issue with the way you are calling your task and expecting it to work, but that is out-of-scope for your question. To fix the circular import, just use a local import instead of a global import:


models.py

class Picture(PolymorphicModel): file = models.ImageField() processed_file = models.ImageField(blank=True, null=True) ... def save(self, *args, **kwargs): from .tasks import celery_task if self.file: self.processed_file = celery_task.delay(self.id, other_arg) super(Picture, self).save(*args, **kwargs)
tasks.py
from .models import Picture @task def celery_task(id, other_arg): try: picture = Picture.objects.get(id=id) except ObjectDoesNotExist: picture = None if picture: return some_other_function(picture.file) return None

Upvotes: 2

Related Questions