csukcl
csukcl

Reputation: 193

Airflow DAG trigger wait_for_completion not working as expected?

I have multiple DAGs that depend on initial_dag running first, after that I'd like the dependent DAGs to run one by one after each other. Here is what I have:

dag = DAG(
    dag_id=DAG_NAME,
    default_args=default_args,
    schedule_interval=None,
    start_date=airflow.utils.dates.days_ago(1)
)

initial_dag = BashOperator(
    task_id='initial_dag',
    bash_command="python /home/airflow/gcs/dags/task.py",
    dag=dag
)

dependent_dag1 = TriggerDagRunOperator(
    task_id="dependent_dag1",
    trigger_dag_id="dependent_dag1",
    wait_for_completion=True,
    dag=dag
)

dependent_dag2 = TriggerDagRunOperator(
    task_id="dependent_dag2",
    trigger_dag_id="dependent_dag2",
    wait_for_completion=True,
    dag=dag
)

dependent_dag3 = TriggerDagRunOperator(
    task_id="dependent_dag3",
    trigger_dag_id="dependent_dag3",
    wait_for_completion=True,
    dag=dag
)

initial_dag >> dependent_dag1 >> dependent_dag2 >> dependent_dag3

I thought the wait_for_completion=True would complete the run of each DAG before triggering the next one. E.g. initial_dag runs and completes, then trigger dependent_dag1 and wait for that to complete to trigger subsequent tasks.

The order the DAGs are being triggered is correct, but it doesn't seem to be waiting for the previous DAG to complete first, e.g. dependent_dag2 is being triggered before dependent_dag1 has been completed.

Am I missing something here?

Upvotes: 1

Views: 13796

Answers (3)

glincow
glincow

Reputation: 111

If you need to wait for the previous DAG to complete first, consider using ExternalTaskSensor instead of TriggerDagRunOperator.

See documentation here: https://airflow.apache.org/docs/apache-airflow/1.10.3/_api/airflow/sensors/external_task_sensor/index.html

This operator will wait until the another DAG (or another DAG's task) is complete with a specified status (default is "success") until moving forward.

Upvotes: 0

Josh Fell
Josh Fell

Reputation: 3589

This was answered on the Airflow GitHub discussion board but to bring both threads together here for other users.

Unfortunately, the wait_for_completion parameter isn't available as part of 1.10.x versions (see documentation) and is being handled and ignored as a generic kwarg. This parameter is available starting with 2.0.

Upvotes: 2

bruno-uy
bruno-uy

Reputation: 1875

The wait_for_completion parameter is the completion of the task and not the DAG itself. The task is marked as completed when it triggers the DAG successfully, so it won't wait for the completion of that DAG in particular.

You have at least two options:

  1. Use ExternalTaskSensor between the trigger calls to wait for the last task of the previous DAG.
  2. Have a TriggerDagRunOperator at the end of the dependent DAGs. For example, the last task of dependent_dag1 will be a TriggerDagRunOperator to run dependent_dag2 and so on.

Your choice will mainly depend on the possibility to change the DAGs for option 2, and the flexibility you want to have (think that if you use option 1 you need to keep track of the last task of the dependent DAGs, but it's still more flexible).

Upvotes: 1

Related Questions