Reputation: 2388
I just joined a new company and I'm trying to learn Airflow as I used it. So far I've got the basics of most things down except External Task Sensors.
I have two DAGs, DAG A that has a schedule interval of "0 6 * * *"
and DAG B with schedule interval of "0 7 * * *"
DAG A waits for DAG B to Complete before it Continues. However DAG B sometimes takes 3 hours to Complete and at other times 10+ hours.
I created an ExternalTask Sensor as show Below but it never triggers and timesout even when DAG B is complete.
ExternalTaskSensor(
task_id = "wait_sensor",
external_dag_id="dag_b",
external_task_id = "end",
poke_interval = 60*30,
timeout=60*60,
retries = 10,
execution_delta= timedelta(hours=2),
dag=dag
)
How can I properly set up the sensor?
Upvotes: 7
Views: 23758
Reputation: 636
The execution date of DAG A is one hour before DAG B, and you set the execution delta to 2 hours, meaning DAG A external sensor is trying to find DAG B with an execution date of 0 4 * * *, which doesn't exist. in this case, your external sensor task fails on timeout. you could set check_existence=True
to fail immediately instead of waiting for 10 retries.
Also, you can see in the external task sensor log on which external task execution date its poking:
[2022-12-02, 08:21:36 UTC] {external_task.py:206} INFO - Poking for tasks ['test_task'] in dag test_dag on 2022-12-02T08:25:00+00:00 ...
Solution:
From the sensor docs, "For yesterday, use [positive!] datetime.timedelta(days=1)"
So to resolve this, you need to provide a negative execution_delta of 1 hour, as the execution date of DAG A is exactly 1 hour after DAG B:
execution_delta=timedelta(hours=-1)
Upvotes: 9
Reputation: 111
I found this a bit tricky in Airflow so I decided to make a function that reads execution date from your external task. I am dynamically generate task names in my DAG and this is how waiting task looks like and using text separator string so later I can read external dag and task name from my get_context
function
def wait_for_another_task(dag_name, task_name, table_name):
task = ExternalTaskSensor(
task_id=f"{table_name}{WAITING_TASK_TEXT_SEPARATOR}{dag_name}.{task_name}",
external_dag_id=dag_name,
external_task_id=task_name,
timeout=60 * 60, # timeout is in sec, so *60 and we have timeout in minutes
allowed_states=['success'],
failed_states=['failed', 'skipped'],
execution_date_fn = get_context,
mode = 'reschedule'
)
return task
In my case, there is WAITING_TASK_TEXT_SEPARATOR = '_wait_for_'
and here is my get_context
funcion
def get_context(dt, **kwargs):
task_instance_str = str(kwargs["task_instance"])
# look for "_wait_for_" string as it is separator for external_dag
starting_loc_of_wait_for = task_instance_str.find(WAITING_TASK_TEXT_SEPARATOR)
len_of_wait_for = len(WAITING_TASK_TEXT_SEPARATOR)
ending_loc_of_wait_for = starting_loc_of_wait_for + len_of_wait_for
beggiging_of_dag_task = task_instance_str[ending_loc_of_wait_for:]
ending_of_dag_task_loc = beggiging_of_dag_task.find(" ")
dag_task = beggiging_of_dag_task[:ending_of_dag_task_loc]
dag_task_lst = dag_task.split('.')
dag_name = dag_task_lst[0]
task_name = dag_task_lst[1]
dag_runs = DagRun.find(dag_id=dag_name)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
if dag_runs:
last_exec_date = dag_runs[0].execution_date
return last_exec_date
else:
return dt
ExternalTaskSensor has a execution_date_fn (https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor) than returns current execution’s logical date so if we dont have previous external dag runs, we can still got current run of current DAG.
Based on that, you can just tweak timeout
in the ExternalTaskSensor
and dont need to worry about timedeltas and so
Upvotes: 0
Reputation: 9798
First of all timeout
is set to ` hour which will cause If you pass it timeout=60*60 on start, it will fail after 1 hour.
also I recommend using reschedule
instead of poke
The poke and reschedule modes can be configured directly when you instantiate the sensor; generally, the trade-off between them is latency. Something that is checking every second should be in poke mode, while something that is checking every minute should be in reschedule mode.
you can check something like this:
ExternalTaskSensor(
task_id = "wait_sensor",
external_dag_id="dag_b",
external_task_id = "end",
mode="reschedule",
timeout=60*60*23,
retries = 10,
execution_delta= timedelta(hours=2),
dag=dag
)
Upvotes: 0