gegenschall
gegenschall

Reputation: 123

Immediately access django-celery-results TaskResult after starting

I have several Celery tasks I'm executing within a Django view (more specifically within Django Rest Framework's perform_create method).

What I'm trying to achieve is to immediately (that is, as soon as the task has an id/is in the results backend) access the TaskResult object and do something with it, like this:

tasks = [do_something.s(a) for a in (1, 2, 3, 4,)]
results = group(*tasks).apply_async()

for result in results.children:
    task = TaskResult.objects.get(task_id=result.task_id)
    do_something_with_task_object(task)

Now, this fails with django_celery_results.models.DoesNotExist: TaskResult matching query does not exist.

I did not yet try it, but I could make this work with something like the following snippet. But that strikes me as plain wrong and ugly, also does it wait until the tasks are finished:

while not all([TaskResult.objects.filter(task_id=t.task_id).exists() for t in results.children]):
    pass        

Is there some way to make this work in a nice and clean fashion?

Upvotes: 6

Views: 2166

Answers (2)

Loïc
Loïc

Reputation: 1

I don't know if it worked for everyone, but with django-celery-results==2.2.0, the transaction as a context manager doesn't seem to work anymore. On the other hand, in a post_save signal, it seems ok.

# models.py

@receiver(post_save, sender=TaskResult)
def after_task_result(sender, instance, created, **kwargs):
    if created: transaction.on_commit(lambda x:do_something())

However, I lose the variables in the view that are not passed in the model creation with signal. In this case, it is still the ugly code that works best.

# views.py

while not TaskResult.objects.filter(task_id = task.id).exists(): pass
task = TaskResult.objects.get(task_id = task.id)
# do something more complex with local variables

Upvotes: 0

gegenschall
gegenschall

Reputation: 123

It turns out that a) the moment you ask a question on StackOverflow, you're able to answer it yourself and b) Django transaction management does everything you need.

If you wrap the call to task.apply_async in an atomic wrapper all is fine, e.g.

with transactions.atomic():
    results = group(*tasks).apply_async()

TaskResult.objects.get(task_id=results.children[0].task_id)

Upvotes: 1

Related Questions