Reputation: 27
I have 2 DAGs: dag_a and dag_b (dag_a -> dag_b) After dag_a is executed, TriggerDagRunOperator is called, which starts dag_b. The problem is, when dag_b is off (paused), dag_a's TriggerDagRunOperator creates scheduled runs in dag_b that queue up for as long as dag_a is running. After turning dag_b back ON, the execution of tasks from the queue begins. I'm trying to find a solution for TriggerDagRunOperator, namely a conditionally_trigger function that would skip the execution of the TriggerDagRunOperator task if dag_b is paused (OFF). How can i do this?
Upvotes: 0
Views: 1159
Reputation: 27
import airflow.settings
from airflow.models import DagModel
def check_status_dag(*op_args):
session = airflow.settings.Session()
qry = session.query(DagModel).filter(DagModel.dag_id == op_args[0])
if not qry.value(DagModel.is_paused):
return op_args[1]
else: return op_args[2]
Where check_status_dag is the method of making a choice decision for executing a further branch, op_args[0] is the dag_id of the dag being checked for pause status, op_args[1] and op_args[2] are the names of the tasks in accordance with the logic of the BranchPythonOperator
start = DummyOperator(
task_id = 'start',
dag=dag
)
check_dag_B = BranchPythonOperator(
task_id = "check_dag_B",
python_callable = check_status_dag,
op_args = ['dag_B','trigger_dag_B','skip_trigger_dag_B'],
trigger_rule = 'all_done',
dag = dag
)
trigger_dag_B = TriggerDagRunOperator(
task_id = 'trigger_dag_B',
trigger_dag_id = 'dag_B',
dag = dag
)
skip_trigger_dag_B = DummyOperator(
task_id = 'skip_trigger_dag_B',
dag = dag
)
finish = DummyOperator(
task_id = 'finish',
trigger_rule = 'all_done',
dag=dag
)
start >> check_dag_B >> [trigger_dag_B, skip_trigger_dag_B] >> finish#or continue working
Upvotes: 0
Reputation: 9308
You can use ShortCircuitOperator
to execute/skip the downstream dag_b
. Then, use the Airflow Rest API (or shell/CLI) to figure out whether dag_b
is paused or not.
dag_a = TriggerDagRunOperator(
trigger_dag_id='dag_a',
...
)
pause_check = ShortCircuitOperator(
task_id='pause_check',
python_callable=is_dag_paused,
op_kwargs={
'dag_id': 'dag_b'
}
)
dag_b = TriggerDagRunOperator(
trigger_dag_id='dag_b',
...
)
dag_a >> pause_check >> dag_b
and is_dag_paused
function can be like this. (here I use Rest API.)
def is_dag_paused(**kwargs):
import requests
from requests.auth import HTTPBasicAuth
dag_id = kwargs['dag_id']
res = requests.get(f'http://{airflow_host}/api/v1/dags/{dag_id}/details',
auth=HTTPBasicAuth('username', 'pasword')) # The auth method could be different for you.
if res.status_code == 200:
rjson = res.json()
# if you return True, the downstream tasks will be executed
# if False, it will be skipped
return not rjson['is_paused']
else:
print('Error: ', res)
exit(1)
Upvotes: 1