Joe F.
Joe F.

Reputation: 867

Celery chain - if any tasks fail, do x, else y

I'm just getting into Celery chains in my Django project. I have the following function:

def orchestrate_tasks_for_account(account_id):

    # Get the account, set status to 'SYNC' until the chain is complete
    account = Account.objects.get(id=account_id)
    account.status = "SYNC"
    account.save()

    chain = task1.s(account_id) | task2.s() | task3.s()
    chain()

    # if any of the tasks in the chain failed, set account.status = 'ERROR'
    # else set the account.status = 'OK'

The chain works as expected, but I'm not sure how to take feedback from the chain and update the account based on the results

In other words, I'd like to set the account status to 'ERROR' if any of the tasks in the chain fail, otherwise I'd like to set the account status to 'OK'

I'm confused by the Celery documentation on how to handle an error with an if/else like I've commented in the last two lines above.

Does anyone have experience with this?

Upvotes: 0

Views: 903

Answers (1)

Joe F.
Joe F.

Reputation: 867

Ok - here's what I came up with

I've leveraged the waiting library in this solution

from celery import chain
from waiting import wait


def orchestrate_tasks_for_account(account_id):

    account = Account.objects.get(id=account_id)
    account.status = "SYNC"
    account.save()

    job = chain(
        task1.s(account_id),
        task2.s(),
        task3.s()
        )
    result = job.apply_async()

    wait(
        lambda: result.ready(), # when async job is completed...
        timeout_seconds=1800, # wait 1800 seconds (30 minutes)
        waiting_for="task orchestration to complete"
        )

    if result.successful():
        account.status = 'OK'
    else:
        account.status = 'ERROR'

    account.save()

I am open to suggestions to make this better!

Upvotes: 2

Related Questions