Andrew Yar
Andrew Yar

Reputation: 27

conditionally_trigger for TriggerDagRunOperator

I have 2 DAGs: dag_a and dag_b (dag_a -> dag_b) After dag_a is executed, TriggerDagRunOperator is called, which starts dag_b. The problem is, when dag_b is off (paused), dag_a's TriggerDagRunOperator creates scheduled runs in dag_b that queue up for as long as dag_a is running. After turning dag_b back ON, the execution of tasks from the queue begins. I'm trying to find a solution for TriggerDagRunOperator, namely a conditionally_trigger function that would skip the execution of the TriggerDagRunOperator task if dag_b is paused (OFF). How can i do this?

Upvotes: 0

Views: 1159

Answers (2)

Andrew Yar
Andrew Yar

Reputation: 27

import airflow.settings
from airflow.models import DagModel
def check_status_dag(*op_args):
    session = airflow.settings.Session()
    qry = session.query(DagModel).filter(DagModel.dag_id == op_args[0])
    if not qry.value(DagModel.is_paused):
        return op_args[1]
    else: return op_args[2]

Where check_status_dag is the method of making a choice decision for executing a further branch, op_args[0] is the dag_id of the dag being checked for pause status, op_args[1] and op_args[2] are the names of the tasks in accordance with the logic of the BranchPythonOperator

start = DummyOperator(
    task_id = 'start',
    dag=dag
    )

check_dag_B = BranchPythonOperator(
    task_id = "check_dag_B",
    python_callable = check_status_dag,
    op_args = ['dag_B','trigger_dag_B','skip_trigger_dag_B'],
    trigger_rule = 'all_done',
    dag = dag
)

trigger_dag_B = TriggerDagRunOperator(
    task_id = 'trigger_dag_B',
    trigger_dag_id = 'dag_B',
    dag = dag
)

skip_trigger_dag_B = DummyOperator(
    task_id = 'skip_trigger_dag_B',
    dag = dag
)

finish = DummyOperator(
    task_id = 'finish',
    trigger_rule = 'all_done',
    dag=dag
)

start >> check_dag_B >> [trigger_dag_B, skip_trigger_dag_B] >> finish#or continue working

Upvotes: 0

Emma
Emma

Reputation: 9308

You can use ShortCircuitOperator to execute/skip the downstream dag_b. Then, use the Airflow Rest API (or shell/CLI) to figure out whether dag_b is paused or not.

dag_a = TriggerDagRunOperator(
    trigger_dag_id='dag_a',
    ...
)

pause_check = ShortCircuitOperator(
    task_id='pause_check',
    python_callable=is_dag_paused,
    op_kwargs={
        'dag_id': 'dag_b'
    }
)

dag_b = TriggerDagRunOperator(
    trigger_dag_id='dag_b',
    ...
)

dag_a >> pause_check >> dag_b

and is_dag_paused function can be like this. (here I use Rest API.)

def is_dag_paused(**kwargs):
    import requests
    from requests.auth import HTTPBasicAuth
    
    dag_id = kwargs['dag_id']
    res = requests.get(f'http://{airflow_host}/api/v1/dags/{dag_id}/details',
                       auth=HTTPBasicAuth('username', 'pasword'))  # The auth method could be different for you. 

    if res.status_code == 200:
        rjson = res.json()
        # if you return True, the downstream tasks will be executed
        # if False, it will be skipped
        return not rjson['is_paused']
    else:
        print('Error: ', res)
        exit(1)

Upvotes: 1

Related Questions