Reputation: 123
I have several Celery tasks I'm executing within a Django view (more specifically within Django Rest Framework's perform_create
method).
What I'm trying to achieve is to immediately (that is, as soon as the task has an id/is in the results backend) access the TaskResult
object and do something with it, like this:
tasks = [do_something.s(a) for a in (1, 2, 3, 4,)]
results = group(*tasks).apply_async()
for result in results.children:
task = TaskResult.objects.get(task_id=result.task_id)
do_something_with_task_object(task)
Now, this fails with django_celery_results.models.DoesNotExist: TaskResult matching query does not exist
.
I did not yet try it, but I could make this work with something like the following snippet. But that strikes me as plain wrong and ugly, also does it wait until the tasks are finished:
while not all([TaskResult.objects.filter(task_id=t.task_id).exists() for t in results.children]):
pass
Is there some way to make this work in a nice and clean fashion?
Upvotes: 6
Views: 2166
Reputation: 1
I don't know if it worked for everyone, but with django-celery-results==2.2.0
, the transaction as a context manager doesn't seem to work anymore.
On the other hand, in a post_save
signal, it seems ok.
# models.py
@receiver(post_save, sender=TaskResult)
def after_task_result(sender, instance, created, **kwargs):
if created: transaction.on_commit(lambda x:do_something())
However, I lose the variables in the view that are not passed in the model creation with signal. In this case, it is still the ugly code that works best.
# views.py
while not TaskResult.objects.filter(task_id = task.id).exists(): pass
task = TaskResult.objects.get(task_id = task.id)
# do something more complex with local variables
Upvotes: 0
Reputation: 123
It turns out that a) the moment you ask a question on StackOverflow, you're able to answer it yourself and b) Django transaction management does everything you need.
If you wrap the call to task.apply_async
in an atomic
wrapper all is fine, e.g.
with transactions.atomic():
results = group(*tasks).apply_async()
TaskResult.objects.get(task_id=results.children[0].task_id)
Upvotes: 1