Reputation: 18745
I can't figure out why my task is considered to be done before all its subtasks are done.
tasks.scan_user.delay(1)
CODE:
@task()
def scan_chunk(ids):
occs = Occurence.objects.filter(product_id__in=ids)
result = scan_occurences_dummy_pool((x.id,x.url,x.xpath) for x in occs)
return result
@task()
def scan_user(id):
chunks = generate_chunks_from_user(id)
ch = chain(scan_chunk.si([x.id for x in chunk]) for chunk in chunks)
return ch()
This is the Celery output, as you can see, the scan_user
succeeded before all scan_chunks
are done which is a problem because I want scan_user
to us in another chain
.
[2017-02-09 14:27:03,493: INFO/MainProcess] Received task: engineapp.tasks.scan_user[ed358a98-a685-4002-baac-993fdc7b64cf]
[2017-02-09 14:27:05,721: INFO/MainProcess] Received task: engineapp.tasks.scan_chunk[35b74e01-f9fa-471f-8c20-ecbf99a89201]
[2017-02-09 14:27:06,740: INFO/MainProcess] Task engineapp.tasks.scan_user[ed358a98-a685-4002-baac-993fdc7b64cf] succeeded in 3.24300003052s: <AsyncResult: 442f9373-d983-4696-a42a-ba42a8ce7761>
[2017-02-09 14:27:22,178: INFO/MainProcess] Received task: engineapp.tasks.scan_chunk[36a94ad4-3c9e-4f7d-a040-5c2a617a0d8f]
[2017-02-09 14:27:23,204: INFO/MainProcess] Task engineapp.tasks.scan_chunk[35b74e01-f9fa-471f-8c20-ecbf99a89201] succeeded in 17.4779999256s: [
I would like to create another task
which will run sequentially scan_user
for all users, but I think this is not possible since it would be in fact parallel.
Upvotes: 0
Views: 192
Reputation: 746
ch()
just runs the chain without waiting for the result.
If you want to wait, do:
ch = chain(scan_chunk.si([x.id for x in chunk]) for chunk in chunks)()
return ch.get()
Upvotes: 1