Etienne
Etienne

Reputation: 12590

In Celery how to update the state of a main task until all his subtasks are done?

In Celery I'm running a main task that run one subtask for each item it get from the query. Subtasks should run in parallel. On the UI I have a progress bar that show how many subtasks are done on the total. I'm updating the main task state to give the info to the progress bar. My problem is that the main task ended right after pushing all the subtasks to the broker so I can't update his state anymore. I wish that the main task could wait until all the subtasks were done. Is it possible? Any other solutions? Here's my pseudo code (real code don't use global ;-)).

total = 0
done = 0

@task(ignore_result=True)
def copy_media(path):
    global total, done
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    for document in documents:
        process_doc.delay(document, path, copy_media)

@task(ignore_result=True)
def process_doc(document, path, copy_media):
    global total, done
    # Do some stuff
    done += 1
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})

Upvotes: 5

Views: 5154

Answers (3)

RickyA
RickyA

Reputation: 16029

I don't know which version of celery you are running but you could have a look at Group subtasks (new in 3.0).

Upvotes: -1

Alexander Lebedev
Alexander Lebedev

Reputation: 6044

You can use memcached-backed caching to store number of complete tasks. There's even cache.inrc in django cache API for atomic increment to make sure concurrent updates of count don't screw things up.

Also, holding main task running until all subtasks complete is bad idea because you're basically blocking one of celery workers for a long time. If celery is run with one worker process, this will result in never-ending lock.

Upvotes: 0

Etienne
Etienne

Reputation: 12590

I found a way using TaskSet. But I'm not totally satisfied because I can't ignore the result of the subtasks. If I ignore result for the process_doc task results.ready() always return False, results.completed_count() always return 0, etc. Here's the code:

@task(ignore_result=True)
def copy_media(path):
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    job = TaskSet(tasks=[process_doc.subtask((document, path))
                         for document in documents])
    results = job.apply_async()
    doc_name = ''
    while not results.ready():
        done = results.completed_count()
        if done:
            last = done - 1
            for idx in xrange(last, -1, -1):
                if results[idx].ready():
                    doc_name = results[idx].result
                    break
        copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name})
        time.sleep(0.25)

@task()
def process_doc(document, path):
    # Do some stuff
    return document

Upvotes: 3

Related Questions