codeBarer
codeBarer

Reputation: 2388

What is the proper way to set external task sensor in Airflow with different scheduled interval?

I just joined a new company and I'm trying to learn Airflow as I used it. So far I've got the basics of most things down except External Task Sensors.

I have two DAGs, DAG A that has a schedule interval of "0 6 * * *" and DAG B with schedule interval of "0 7 * * *" DAG A waits for DAG B to Complete before it Continues. However DAG B sometimes takes 3 hours to Complete and at other times 10+ hours.

I created an ExternalTask Sensor as show Below but it never triggers and timesout even when DAG B is complete.

ExternalTaskSensor(
    task_id = "wait_sensor",
    external_dag_id="dag_b",
    external_task_id = "end",
    poke_interval = 60*30,
    timeout=60*60,
    retries = 10,
    execution_delta= timedelta(hours=2),
    dag=dag 
)

How can I properly set up the sensor?

Upvotes: 7

Views: 23758

Answers (3)

Chen Meyouhas
Chen Meyouhas

Reputation: 636

The execution date of DAG A is one hour before DAG B, and you set the execution delta to 2 hours, meaning DAG A external sensor is trying to find DAG B with an execution date of 0 4 * * *, which doesn't exist. in this case, your external sensor task fails on timeout. you could set check_existence=True to fail immediately instead of waiting for 10 retries. Also, you can see in the external task sensor log on which external task execution date its poking:

[2022-12-02, 08:21:36 UTC] {external_task.py:206} INFO - Poking for tasks ['test_task'] in dag test_dag on 2022-12-02T08:25:00+00:00 ...

Solution:

From the sensor docs, "For yesterday, use [positive!] datetime.timedelta(days=1)"

So to resolve this, you need to provide a negative execution_delta of 1 hour, as the execution date of DAG A is exactly 1 hour after DAG B:

execution_delta=timedelta(hours=-1)

Upvotes: 9

Michal Volešíni
Michal Volešíni

Reputation: 111

I found this a bit tricky in Airflow so I decided to make a function that reads execution date from your external task. I am dynamically generate task names in my DAG and this is how waiting task looks like and using text separator string so later I can read external dag and task name from my get_context function

def wait_for_another_task(dag_name, task_name, table_name):
    task = ExternalTaskSensor(
        task_id=f"{table_name}{WAITING_TASK_TEXT_SEPARATOR}{dag_name}.{task_name}",
        external_dag_id=dag_name,
        external_task_id=task_name,
        timeout=60 * 60, # timeout is in sec, so *60 and we have timeout in minutes
        allowed_states=['success'],
        failed_states=['failed', 'skipped'],
        execution_date_fn = get_context,
        mode = 'reschedule'
    )
    return task

In my case, there is WAITING_TASK_TEXT_SEPARATOR = '_wait_for_' and here is my get_context funcion

def get_context(dt, **kwargs):
    task_instance_str = str(kwargs["task_instance"])
    # look for "_wait_for_" string as it is separator for external_dag
    starting_loc_of_wait_for = task_instance_str.find(WAITING_TASK_TEXT_SEPARATOR)
    len_of_wait_for = len(WAITING_TASK_TEXT_SEPARATOR)
    ending_loc_of_wait_for = starting_loc_of_wait_for + len_of_wait_for

    beggiging_of_dag_task = task_instance_str[ending_loc_of_wait_for:]
    ending_of_dag_task_loc = beggiging_of_dag_task.find(" ")
    dag_task = beggiging_of_dag_task[:ending_of_dag_task_loc]
    dag_task_lst = dag_task.split('.')
    dag_name = dag_task_lst[0]
    task_name = dag_task_lst[1]

    dag_runs = DagRun.find(dag_id=dag_name)
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    if dag_runs:
        last_exec_date = dag_runs[0].execution_date
        return last_exec_date
    else:
        return dt

ExternalTaskSensor has a execution_date_fn (https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor) than returns current execution’s logical date so if we dont have previous external dag runs, we can still got current run of current DAG. Based on that, you can just tweak timeout in the ExternalTaskSensor and dont need to worry about timedeltas and so

Upvotes: 0

user16930239
user16930239

Reputation: 9798

First of all timeout is set to ` hour which will cause If you pass it timeout=60*60 on start, it will fail after 1 hour.

also I recommend using reschedule instead of poke

The poke and reschedule modes can be configured directly when you instantiate the sensor; generally, the trade-off between them is latency. Something that is checking every second should be in poke mode, while something that is checking every minute should be in reschedule mode.

you can check something like this:

ExternalTaskSensor(
    task_id = "wait_sensor",
    external_dag_id="dag_b",
    external_task_id = "end",
    mode="reschedule",
    timeout=60*60*23,
    retries = 10,
    execution_delta= timedelta(hours=2),
    dag=dag 
)

Upvotes: 0

Related Questions