Reputation: 61
I am trying to implement DAG dependency between 2 DAGs say A and B. DAG A runs once every hour and DAG B runs every 15 mins.
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'dependency',
'depends_on_past': False,
'start_date': datetime(2020, 9, 10, 10, 1),
'email': [''],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
with DAG('DAG_A', schedule_interval='0/60 * * * *',max_active_runs=1, catchup=False,
default_args=default_args) as dag:
task1 = DummyOperator(task_id='task1', retries=1, dag=dag)
task2 = DummyOperator(task_id='task2', retries=1, dag=dag)
task3 = DummyOperator(task_id='task3', retries=1, dag=dag)
task1 >> task2 >> task3
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'dependency',
'depends_on_past': False,
'start_date': datetime(2020, 9, 10, 10, 1),
'email': [''],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
with DAG('DAG_B', schedule_interval='0/15 * * * *',max_active_runs=1, catchup=False,
default_args=default_args) as dag:
task4 = DummyOperator(task_id='task4', retries=1, dag=dag)
task5 = DummyOperator(task_id='task5', retries=1, dag=dag)
task6 = DummyOperator(task_id='task6', retries=1, dag=dag)
task4 >> task5 >> task6
I have tried using ExternalTaskSensor operator. I am unable to understand if the sensor finds DAG A to be in success state it triggers the next task else wait for the task to complete.
Thanks in advance.
Upvotes: 5
Views: 2329
Reputation: 4873
You could use ExternalTaskSensor to achieve what you are looking for. The key aspect is to initialize this sensor with the correct execution_date
, being that in your example the execution_date
of the last DagRun
of DAG_A.
Check this example where DAG_A runs every 9 minutes for 200 seconds. DAG_B runs every 3 minutes and runs for 30 seconds. These values are arbitrary and only for demo purpose, could be pretty much anything.
DAG A (nothing new here):
import time
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def _executing_task(**kwargs):
print("Starting task_a")
print("Completed task_a")
dag = DAG(
default_args={"owner": "airflow"},
schedule_interval="*/9 * * * *",
with dag:
start = DummyOperator(
task_a = PythonOperator(
chain(start, task_a)
import time
from airflow import DAG
from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor
def _executing_task():
print("Completed task_b")
def _get_execution_date_of_dag_a(exec_date, session=None, **kwargs):
dag_a_last_run = get_last_dagrun(
'example_external_task_sensor_a', session)
print(f"EXEC DATE: {dag_a_last_run.execution_date}")
return dag_a_last_run.execution_date
dag = DAG(
default_args={"owner": "airflow"},
schedule_interval="*/3 * * * *",
with dag:
start = DummyOperator(
wait_for_dag_a = ExternalTaskSensor(
allowed_states=['success', 'failed'],
task_b = PythonOperator(
chain(start, wait_for_dag_a, task_b)
We are using the param execution_date_fn
of the ExternalTaskSensor
in order to obtain the execution_date
of the last DagRun of the DAG_A, if we don't do so, it will wait for DAG_A with the same execution_date
as the actual run of DAG_B which may not exists in many cases.
The function _get_execution_date_of_dag_a
does a query to the metadata DB to obtain the exec_date by using get_last_dagrun
from Airflow models.
Finally the other important parameter is allowed_states=['success', 'failed']
where we are telling it to wait until DAG_A is found in one of those states (i.e if it is in running
state will keep executing poke).
Try it out and let me know if it worked for you!.
Upvotes: 3
Reputation: 20097
I think the only way you can achieve that in "general" way is to use some external locking mechanism
You can achieve quite a good approximation though using pools:
if you set pool size to 1 and assign both dag A and B to the pool, only one of those can be running at a time. You can also add priority_weight in the way that you see best - in case you need to prioritise A over B or the other way round.
Upvotes: 1