Alessandro S.
Alessandro S.

Reputation: 1043

Trigger a task when another finished with success

My use-case is as follows:

The three tasks are scheduled to run daily, and Task B and Task C are scheduled to run enough time after Task A, and simply fail if the input dataset has not been generated for some reason.

As a first improvement I added an ExternalTaskSensor in both Task B and Task C, but this just avoids to run them if Task A did not yet finished or failed.

However, ExternalTaskSensor seems not to be working fine with backfilling (it is pretty fragile as it relies on execution date only, plus if Task A is run again, Task B and Task C won't know).

Solution 1 (not applicable): I have seen this SO's question: In airflow, is there a good way to call another dag's task?

This is not ideal for me because I'd like to keep Task A unaware of the dependent tasks, and handle the logic in Task B and Task C (or externally). Reason is that other tasks consuming the output of Task A will be added in the future (from different teams across the organization), and it's not desirable to update Task A each time.

Summary I'd like to trigger Task B and Task C if and only if Task A has been executed with success (independently if it has been scheduled or triggered manually), without modifying Task A to achieve that.

Upvotes: 0

Views: 1413

Answers (1)

nightgaunt
nightgaunt

Reputation: 910

To suit your scenario, the only concept I can think of is SubDags. (Refer to WARNING before you implement)

SubDagOperator allows you to attach a set of tasks to your task A. Refer to code below.

dag = DAG('parent_dag', description='Parent',
          schedule_interval='@daily',
          start_date=datetime.now())

task_a = DummyOperator(dag=dag, task_id='task_a')

subdag_task = SubDag(task_id='load_tasks',
    subdag=load_subdag('parent_dag', 'dependent_tasks'),
    dag=dag)

task_a >> subdag_task

Now in a separate file you define your load_subdag function.

def load_subdag(parent_dag_name, child_dag_name):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_name, child_dag_name),
        schedule_interval="@daily",
    )
    with dag_subdag:
        task_b = DummyOperator(
            task_id='load_subdag_task_b',
            dag=dag_subdag)

       task_c = DummyOperator(
            task_id='load_subdag_task_c',
            dag=dag_subdag)

    return dag_subdag

WARNING (In Red and bold): SubDag tasks occupy slots in your worker like maggots. Please understand the caveats completely before you jump into this. AIRFLOW-74 gives a picture of how bad it can be. It is outright rejected by many developers for the same reason.

Upvotes: 1

Related Questions