Reputation: 193
I have multiple DAGs that depend on initial_dag
running first, after that I'd like the dependent DAGs to run one by one after each other. Here is what I have:
dag = DAG(
dag_id=DAG_NAME,
default_args=default_args,
schedule_interval=None,
start_date=airflow.utils.dates.days_ago(1)
)
initial_dag = BashOperator(
task_id='initial_dag',
bash_command="python /home/airflow/gcs/dags/task.py",
dag=dag
)
dependent_dag1 = TriggerDagRunOperator(
task_id="dependent_dag1",
trigger_dag_id="dependent_dag1",
wait_for_completion=True,
dag=dag
)
dependent_dag2 = TriggerDagRunOperator(
task_id="dependent_dag2",
trigger_dag_id="dependent_dag2",
wait_for_completion=True,
dag=dag
)
dependent_dag3 = TriggerDagRunOperator(
task_id="dependent_dag3",
trigger_dag_id="dependent_dag3",
wait_for_completion=True,
dag=dag
)
initial_dag >> dependent_dag1 >> dependent_dag2 >> dependent_dag3
I thought the wait_for_completion=True
would complete the run of each DAG before triggering the next one. E.g. initial_dag
runs and completes, then trigger dependent_dag1
and wait for that to complete to trigger subsequent tasks.
The order the DAGs are being triggered is correct, but it doesn't seem to be waiting for the previous DAG to complete first, e.g. dependent_dag2
is being triggered before dependent_dag1
has been completed.
Am I missing something here?
Upvotes: 1
Views: 13796
Reputation: 111
If you need to wait for the previous DAG to complete first, consider using ExternalTaskSensor instead of TriggerDagRunOperator.
See documentation here: https://airflow.apache.org/docs/apache-airflow/1.10.3/_api/airflow/sensors/external_task_sensor/index.html
This operator will wait until the another DAG (or another DAG's task) is complete with a specified status (default is "success") until moving forward.
Upvotes: 0
Reputation: 3589
This was answered on the Airflow GitHub discussion board but to bring both threads together here for other users.
Unfortunately, the wait_for_completion
parameter isn't available as part of 1.10.x versions (see documentation) and is being handled and ignored as a generic kwarg
. This parameter is available starting with 2.0.
Upvotes: 2
Reputation: 1875
The wait_for_completion
parameter is the completion of the task and not the DAG itself. The task is marked as completed when it triggers the DAG successfully, so it won't wait for the completion of that DAG in particular.
You have at least two options:
ExternalTaskSensor
between the trigger calls to wait for the last task of the previous DAG.TriggerDagRunOperator
at the end of the dependent DAGs. For example, the last task of dependent_dag1
will be a TriggerDagRunOperator
to run dependent_dag2
and so on.Your choice will mainly depend on the possibility to change the DAGs for option 2, and the flexibility you want to have (think that if you use option 1 you need to keep track of the last task of the dependent DAGs, but it's still more flexible).
Upvotes: 1