Reputation: 81
Colleagues, we need help. There are two dags Parent and Child, parent has its own schedule, suppose '30 * * * * ', child '1 8-17 * * 1-5', child waits for parent to execute, for example 40 minutes, if parent ends with error, then child also crashes with an error, otherwise the next task of the child class is executed. The problem is that this does not work even in the simplest case, I don’t understand how to synchronize them. I wrote code like this:
Dag Parent
import time
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor, ExternalTaskMarker
start_date = datetime(2021, 3, 1, 20, 36, 0)
class Exept(Exception):
pass
def wait():
time.sleep(3)
with open('etl.txt', 'r') as txt:
line = txt.readline()
if line == 'err':
print(1)
raise Exept
return 'etl success'
with DAG(
dag_id="dag_etl1",
start_date=start_date,
schedule_interval='* * * * *',
tags=['example2'],
) as etl1:
parent_task = ExternalTaskMarker(
task_id="parent_task",
external_dag_id="dag_etl1",
external_task_id="etl_child",
)
wait_timer = PythonOperator(task_id='wait_timer', python_callable=wait)
wait_timer >> parent_task
Dag child
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor, ExternalTaskMarker
from etl_parent import etl1, wait_timer, parent_task
start_date = datetime(2021, 3, 1, 20, 36, 0)
def check():
return 'I succeeded'
with DAG(
dag_id='etl_child',
start_date=start_date,
schedule_interval='* * * * *',
tags = ['testing_child_dag']
) as etl_child:
status = ExternalTaskSensor(
task_id="dag_etl1",
external_dag_id=etl1.dag_id,
external_task_id=parent_task.task_id,
allowed_states=['success'],
mode='reschedule',
execution_delta=timedelta(minutes=1),
timeout=60,
)
task1 = PythonOperator(task_id='task1', python_callable=check)
status >> task1
As you can see, I'm trying to emulate the situation when the parent task fails if err is specified in the text file and succeeds in any other case. But this does not work at all as I expect, at the first start of the dag everything is fine, it works correctly, if I change the data in the text file, then the parent task works correctly, for example, I launch the parent dag with a knowingly error, everything will work correctly, the child class will end with an error, but if I change the text, again the parent will work correctly, but the child will continue to fall for a while, then it may be correct, but not a fact. If the launch is known to be successful, the situation is the same, exactly the opposite. Also, I do not understand how to organize the waiting for the task to be completed in the parent dag.
Please help) I have been working with airflow recently, I may be missing something.
Upvotes: 7
Views: 9823
Reputation: 301
It seems that your DAGs (DAG A & DAG B) have two completely different schedules. You may want to use the ExternalTaskSensor if you want to avoid concurrent modifications issues, and assert that DAG A is not currently running before triggering DAG B.
In that case, you can use the execution_date_fn
parameter:
ExternalTaskSensor(
task_id="sensor",
external_dag_id="dag_id",
execution_date_fn=get_most_recent_dag_run,
mode="reschedule")
where the get_most_recent_dag_run
function looks like this :
from airflow.models import DagRun
def get_most_recent_dag_run(dt):
dag_runs = DagRun.find(dag_id="dag_id")
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
if dag_runs:
return dag_runs[0].execution_date
The get_most_recent_dag_run
will look for the last DAG run for the provided dag_id, allowing the ExternalSensor to work.
Upvotes: 8
Reputation: 313
The most common cause of problems with ExternalTaskSensor is with execution_delta parameter, so I would start there.
I see that both parent and child dag have exactly the same start_date and schedule_interval, yet your execution_delta is 1 minute. In this case your child dag looks for a parent dag that started at 20:35 (in your example), but it actually started at 20:36, so it fails. Even if for the test try to set you parent dag to start at 20:35 and see if it solves the problem.
Here's a good article that goes into a bit more detail about schedule_interval pitfall https://medium.com/@fninsiima/sensing-the-completion-of-external-airflow-tasks-827344d03142
In regards to waiting time, that's your timeout parameter in ExternalTaskSensor. In your case it waits 60 seconds before it fails. Personally, I'll be very cautious with setting a long timeout period. While your sensor is waiting it occupies a worker so no other tasks can use it, which can result in your other tasks being locked from execution, especially if you have a lot of sensors.
Upvotes: 2