Wai Yan
Wai Yan

Reputation: 582

ExternalTaskSensor with multiple dependencies in Airflow

I have dagA (cron 5am) and dagB (cron 6am). Both of these ingest the data from somewhere and dump into the datalake. Now I want dagC (an ETL job) to wait for both dagA and dagB to complete.

I am using an ExternalTaskSensor instead of a TriggerDagRunOperator since I don't believe the ingestion layer should trigger anything downstream. I've read similar questions stating I should run the dags at the same time.

Now, this part confuses me because if I am to follow this, does this mean all my airflow jobs will start at the same time and the downstream jobs keep poking until the upstream is ready? Does that also mean dagA and dagB have to start at the same time even though they have no dependency between each other?

dagA = DAG('dagA', description='dagA',
          schedule_interval='0 5 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)
dagB = DAG('dagB', description='dagB',
          schedule_interval='0 6 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)
dagC = DAG('dagC', description='dagC',
          schedule_interval=None,
          start_date=datetime(2017, 3, 20), catchup=False)

wait_for_dagA = ExternalTaskSensor(
    task_id='wait_for_dagA',
    external_dag_id='dagA',
    external_task_id=None,
    execution_delta=None,
    dag=dag)

wait_for_dagB = ExternalTaskSensor(
    task_id='wait_for_dagB',
    external_dag_id='dagB',
    external_task_id=None,
    execution_delta=None,
    dag=dag)

[wait_for_dagA, wait_for_dagB] >> etl_task 

I am on airflow 1.10.3.

Upvotes: 0

Views: 6320

Answers (1)

y2k-shubham
y2k-shubham

Reputation: 11607

..does this mean all my airflow jobs will start at the same time and the downstream jobs keep poking until the upstream is ready?

  • Airflow jobs will start at the same time only if you want (but there's no requirement as such).
  • The downstream jobs (etl_task and it's downstream dependencies) will start only post success of both wait_for_dagA and wait_for_dagB. These waiting tasks will keep poking (that's what sensors do) until the respective DAGs succeed.

Does that also mean dagA and dagB have to start at the same time even though they have no dependency between each other?

As already told above, this is not a requirement. The entire idea of replacing crons with DAGs is that you don't need to time your tasks accurately; rather you can have the flexibility of forcing them to run one-after-another irrespective of different start-times, execution times and unexpected delays.


Tips

  • But have a look at different poking behaviours configurable by mode param
  • Also do checkout the available params in ExternalTaskSensor
  • If you are not specifying external_task_id in your sensor(s), beware of pitfalls like this

Upvotes: 1

Related Questions