Reputation: 4417
I'm using airflow.operators.sensors.ExternalTaskSensor
to make one Dag wait for another.
dag = DAG(
'dag2',
default_args={
'owner': 'Me',
'depends_on_past': False,
'start_date': start_datetime,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=10),
},
template_searchpath="%s/me/resources/" % DAGS_FOLDER,
schedule_interval="{} {} * * *".format(minute, hour),
max_active_runs=1
)
wait_for_dag1 = ExternalTaskSensor(
task_id='wait_for_dag1',
external_dag_id='dag1',
external_task_id='dag1_task1',
dag=dag
)
If something seriously wrong happens with upstream Dag and it fails to complete during the given time period, I want upstream Dag (ExternalTaskSensor operator) crash as well, instead of hanging forever.
How can I add a timeout to ExternalTaskSensor?
I'm looking into documentation, but it does not seem to have a timeout
parameter or something similar. What should I do?
https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html
Upvotes: 7
Views: 13507
Reputation: 136
The ExternalTaskSensor
does take a timeout
argument in seconds. It inherits the argument from BaseSensorOperator
(https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/base/index.html). If you pass it timeout=60
on instantiation, it will fail after 60 seconds.
Upvotes: 12