Reputation: 1119
I am sending a chain of three tasks to my celery workers. The first and third are added to the queue "filestore", which is served by worker A. The second is added to the queue "cloud" which is served by worker B.
The behaviour I want is for the three tasks to execute in order, one after the other.
The behavour I am seeing is that worker A does task 1, then task 3, then worker B does task 2.
result = app.send_task(
"workerTasks_filestore.task_upload_scan_to_s3", args=[scan.scan_name], queue='filestore',
chain=[
Signature('workerTasks.do_processing_task', args=[scan.scan_name, spd_name], queue=queue, immutable=True),
Signature('workerTasks_filestore.task_download_scan_from_s3', args=[scan.scan_name], queue='filestore', immutable=True),
]
)
What am I doing wrong?
Upvotes: 2
Views: 1034
Reputation: 423
Did you try using the chain class from celery?
from celery import chain, Signature
chained_tasks = chain([
Signature('workerTasks_filestore.task_upload_scan_to_s3', args=(scan.scan_name,), queue='filestore'),
Signature('workerTasks.do_processing_task', args=(scan.scan_name, spd_name,), queue=queue, immutable=True),
Signature('workerTasks_filestore.task_download_scan_from_s3', args=(scan.scan_name,), queue='filestore', immutable=True)
])
result = chained_tasks.apply_async()
Upvotes: 1