Omroth
Omroth

Reputation: 1119

Celery - chained tasks performed out of order

I am sending a chain of three tasks to my celery workers. The first and third are added to the queue "filestore", which is served by worker A. The second is added to the queue "cloud" which is served by worker B.

The behaviour I want is for the three tasks to execute in order, one after the other.

The behavour I am seeing is that worker A does task 1, then task 3, then worker B does task 2.

result = app.send_task(
                        "workerTasks_filestore.task_upload_scan_to_s3", args=[scan.scan_name], queue='filestore',
                        chain=[
                            Signature('workerTasks.do_processing_task', args=[scan.scan_name, spd_name], queue=queue, immutable=True),
                            Signature('workerTasks_filestore.task_download_scan_from_s3', args=[scan.scan_name], queue='filestore', immutable=True),
                        ]
)

What am I doing wrong?

Upvotes: 2

Views: 1034

Answers (1)

Patricio
Patricio

Reputation: 423

Did you try using the chain class from celery?

from celery import chain, Signature

chained_tasks = chain([
    Signature('workerTasks_filestore.task_upload_scan_to_s3', args=(scan.scan_name,), queue='filestore'),
    Signature('workerTasks.do_processing_task', args=(scan.scan_name, spd_name,), queue=queue, immutable=True),
    Signature('workerTasks_filestore.task_download_scan_from_s3', args=(scan.scan_name,), queue='filestore', immutable=True)
])

result = chained_tasks.apply_async()

Upvotes: 1

Related Questions