Airflow: ExternalTaskSensor doesn't work as expected. Different task schedules

Colleagues, we need help. There are two dags Parent and Child, parent has its own schedule, suppose '30 * * * * ', child '1 8-17 * * 1-5', child waits for parent to execute, for example 40 minutes, if parent ends with error, then child also crashes with an error, otherwise the next task of the child class is executed. The problem is that this does not work even in the simplest case, I don’t understand how to synchronize them. I wrote code like this:

Dag Parent

import time

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor  import ExternalTaskSensor, ExternalTaskMarker

start_date =  datetime(2021, 3, 1, 20, 36, 0)

class Exept(Exception):
    pass

def wait():
    time.sleep(3)
    with open('etl.txt', 'r') as txt:
        line = txt.readline()
        if line == 'err':
            print(1)
            raise Exept
    return 'etl success'


with DAG(
    dag_id="dag_etl1",
    start_date=start_date,
    schedule_interval='* * * * *',
    tags=['example2'],
) as etl1:
    parent_task = ExternalTaskMarker(
        task_id="parent_task",
        external_dag_id="dag_etl1",
        external_task_id="etl_child",
    )
    wait_timer = PythonOperator(task_id='wait_timer', python_callable=wait)
    
    wait_timer >> parent_task

Dag child

from datetime import datetime, timedelta


from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor  import ExternalTaskSensor, ExternalTaskMarker

from etl_parent import etl1, wait_timer, parent_task

start_date =  datetime(2021, 3, 1, 20, 36, 0)

def check():
    return 'I succeeded'

with DAG(
    dag_id='etl_child', 
    start_date=start_date, 
    schedule_interval='* * * * *',
    tags = ['testing_child_dag']
) as etl_child:
    status = ExternalTaskSensor(
        task_id="dag_etl1",
        external_dag_id=etl1.dag_id,
        external_task_id=parent_task.task_id,
        allowed_states=['success'],
        mode='reschedule',
        execution_delta=timedelta(minutes=1),
        timeout=60,
    )

    task1 = PythonOperator(task_id='task1', python_callable=check)
    
    status >> task1

As you can see, I'm trying to emulate the situation when the parent task fails if err is specified in the text file and succeeds in any other case. But this does not work at all as I expect, at the first start of the dag everything is fine, it works correctly, if I change the data in the text file, then the parent task works correctly, for example, I launch the parent dag with a knowingly error, everything will work correctly, the child class will end with an error, but if I change the text, again the parent will work correctly, but the child will continue to fall for a while, then it may be correct, but not a fact. If the launch is known to be successful, the situation is the same, exactly the opposite. Also, I do not understand how to organize the waiting for the task to be completed in the parent dag.

Please help) I have been working with airflow recently, I may be missing something.

Upvotes: 7

Views: 9823

Answers (2)

Nahid O.
Nahid O.

Reputation: 301

It seems that your DAGs (DAG A & DAG B) have two completely different schedules. You may want to use the ExternalTaskSensor if you want to avoid concurrent modifications issues, and assert that DAG A is not currently running before triggering DAG B.

In that case, you can use the execution_date_fn parameter:

ExternalTaskSensor(
    task_id="sensor",
    external_dag_id="dag_id",
    execution_date_fn=get_most_recent_dag_run,
    mode="reschedule")

where the get_most_recent_dag_run function looks like this :

from airflow.models import DagRun

def get_most_recent_dag_run(dt):
    dag_runs = DagRun.find(dag_id="dag_id")
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    if dag_runs:
        return dag_runs[0].execution_date

The get_most_recent_dag_run will look for the last DAG run for the provided dag_id, allowing the ExternalSensor to work.

Upvotes: 8

Michael Korotkov
Michael Korotkov

Reputation: 313

The most common cause of problems with ExternalTaskSensor is with execution_delta parameter, so I would start there.

I see that both parent and child dag have exactly the same start_date and schedule_interval, yet your execution_delta is 1 minute. In this case your child dag looks for a parent dag that started at 20:35 (in your example), but it actually started at 20:36, so it fails. Even if for the test try to set you parent dag to start at 20:35 and see if it solves the problem.

Here's a good article that goes into a bit more detail about schedule_interval pitfall https://medium.com/@fninsiima/sensing-the-completion-of-external-airflow-tasks-827344d03142

In regards to waiting time, that's your timeout parameter in ExternalTaskSensor. In your case it waits 60 seconds before it fails. Personally, I'll be very cautious with setting a long timeout period. While your sensor is waiting it occupies a worker so no other tasks can use it, which can result in your other tasks being locked from execution, especially if you have a lot of sensors.

Upvotes: 2

Related Questions