Reputation: 12590
In Celery I'm running a main task that run one subtask for each item it get from the query. Subtasks should run in parallel. On the UI I have a progress bar that show how many subtasks are done on the total. I'm updating the main task state to give the info to the progress bar. My problem is that the main task ended right after pushing all the subtasks to the broker so I can't update his state anymore. I wish that the main task could wait until all the subtasks were done. Is it possible? Any other solutions? Here's my pseudo code (real code don't use global ;-)).
total = 0
done = 0
@task(ignore_result=True)
def copy_media(path):
global total, done
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
documents = Document.objects.all()
total = documents.count()
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
for document in documents:
process_doc.delay(document, path, copy_media)
@task(ignore_result=True)
def process_doc(document, path, copy_media):
global total, done
# Do some stuff
done += 1
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
Upvotes: 5
Views: 5154
Reputation: 16029
I don't know which version of celery you are running but you could have a look at Group subtasks (new in 3.0).
Upvotes: -1
Reputation: 6044
You can use memcached-backed caching to store number of complete tasks. There's even cache.inrc
in django cache API for atomic increment to make sure concurrent updates of count don't screw things up.
Also, holding main task running until all subtasks complete is bad idea because you're basically blocking one of celery workers for a long time. If celery is run with one worker process, this will result in never-ending lock.
Upvotes: 0
Reputation: 12590
I found a way using TaskSet
. But I'm not totally satisfied because I can't ignore the result of the subtasks. If I ignore result for the process_doc
task results.ready()
always return False
, results.completed_count()
always return 0, etc. Here's the code:
@task(ignore_result=True)
def copy_media(path):
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
documents = Document.objects.all()
total = documents.count()
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
job = TaskSet(tasks=[process_doc.subtask((document, path))
for document in documents])
results = job.apply_async()
doc_name = ''
while not results.ready():
done = results.completed_count()
if done:
last = done - 1
for idx in xrange(last, -1, -1):
if results[idx].ready():
doc_name = results[idx].result
break
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name})
time.sleep(0.25)
@task()
def process_doc(document, path):
# Do some stuff
return document
Upvotes: 3