Hasitha
Hasitha

Reputation: 588

Externally trigger dag which has schedule_interval=None

I have a controller dag named dss_controller

dag = DAG(
dag_id='dss_controller',
default_args={
    "owner": "dss admin",
    "start_date": datetime.utcnow(),
},
schedule_interval=None,
)

and a target dag named dss_trigger_target_dag

dag = DAG(
dag_id='dss_trigger_target_dag',
default_args=args,
schedule_interval=None,
)

Tasks are defined in both controller and target dags as in the default available examples.

This system works as expected when dss_controller's schedule_interval is set to "@once".

Then I have set it to None and triggered externally. It'll trigger the controller dag and move it to the running state and then move it to success state.

But it won't trigger dss_trigger_dagrun task of the controller dag. What's reason for this behavior?

Setting schedule_interval=None was the reason, if so how?

This my controller dag,

import pprint
import pprint
from datetime import datetime

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
   """This function decides whether or not to Trigger the remote DAG"""
   c_p = context['params']['condition_param']
   print("Controller DAG : conditionally_trigger = {}".format(c_p))
   if context['params']['condition_param']:
      dag_run_obj.payload = {'message': context['params']['message']}
      pp.pprint(dag_run_obj.payload)
      return dag_run_obj

   # Define the DAG
   dag = DAG(
      dag_id='dss_controller',
    default_args={
        "owner": "dss admin",
        "start_date": datetime.utcnow(),
    },
    schedule_interval=None,
)

# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(
    task_id='dss_trigger_dagrun',
    trigger_dag_id="dss_trigger_target_dag",
    python_callable=conditionally_trigger,
    params={'condition_param': True, 'message': 'Hello Hasitha'},
    dag=dag,
)

and this is my target dag,

import pprint
from datetime import datetime

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

pp = pprint.PrettyPrinter(indent=4)

args = {
    'start_date': datetime.utcnow(),
    'owner': 'dss admin',
}

dag = DAG(
    dag_id='dss_trigger_target_dag',
    default_args=args,
    schedule_interval=None,
)


def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='target_run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

Upvotes: 2

Views: 5724

Answers (1)

Ankur
Ankur

Reputation: 106

You have added "start_date" as now() in the default arguments, that go to each of the tasks. It seems, that is the real culprit. Airflow recommends against it since it can prevent triggering tasks ever. Try setting the start date to something in the past, such as airflow.utils.dates.days_ago(1).


Refer: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date

Upvotes: 2

Related Questions