Is it possible to have a pipeline in Airflow that does not tie to any schedule?

I need to have pipeline that will be executed either manually or programmatically, is possible with Airflow? Looks like right now each workflow MUST be tied to a schedule.

Upvotes: 3

Views: 1150

Answers (3)

Taylor D. Edmiston
Taylor D. Edmiston

Reputation: 13036

In Airflow, every DAG is required to have a start date and schedule interval*, for example hourly:

import datetime

dag = DAG(
    dag_id='my_dag',
    schedule_interval=datetime.timedelta(hours=1),
    start_date=datetime(2018, 5, 23),
)

(Without a schedule how would it know when to run?)

Alternatively to a cron schedule, you can set the schedule to @once to only run once.

*One exception: You can omit the schedule for externally triggered DAGs because Airflow will not schedule them itself.

However, that said, if you omit the schedule, then you need to trigger the DAG externally somehow. If you want to be able to call a DAG programmatically, for instance, as a result of a separate condition occurring in another DAG, you can do that with the TriggerDagRunOperator. You might also hear this idea called externally triggered DAGs.

Here's a usage example from the Airflow Example DAGs:

File 1 - example_trigger_controller_dag.py:

"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py)

This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
  a. A python callable that decides whether or not to trigger the Target DAG
  b. An optional params dict passed to the python callable to help in
     evaluating whether or not to trigger the Target DAG
  c. The id (name) of the Target DAG
  d. The python callable can add contextual info to the DagRun created by
     way of adding a Pickleable payload (e.g. dictionary of primitives). This
     state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime

import pprint

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    c_p = context['params']['condition_param']
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if context['params']['condition_param']:
        dag_run_obj.payload = {'message': context['params']['message']}
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj


# Define the DAG
dag = DAG(dag_id='example_trigger_controller_dag',
          default_args={"owner": "airflow",
                        "start_date": datetime.utcnow()},
          schedule_interval='@once')


# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
                                trigger_dag_id="example_trigger_target_dag",
                                python_callable=conditionally_trigger,
                                params={'condition_param': True,
                                        'message': 'Hello World'},
                                dag=dag)

File 2 - example_trigger_target_dag.py:

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime

import pprint
pp = pprint.PrettyPrinter(indent=4)

# This example illustrates the use of the TriggerDagRunOperator. There are 2
# entities at work in this scenario:
# 1. The Controller DAG - the DAG that conditionally executes the trigger
#    (in example_trigger_controller.py)
# 2. The Target DAG - DAG being triggered
#
# This example illustrates the following features :
# 1. A TriggerDagRunOperator that takes:
#   a. A python callable that decides whether or not to trigger the Target DAG
#   b. An optional params dict passed to the python callable to help in
#      evaluating whether or not to trigger the Target DAG
#   c. The id (name) of the Target DAG
#   d. The python callable can add contextual info to the DagRun created by
#      way of adding a Pickleable payload (e.g. dictionary of primitives). This
#      state is then made available to the TargetDag
# 2. A Target DAG : c.f. example_trigger_target_dag.py

args = {
    'start_date': datetime.utcnow(),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='example_trigger_target_dag',
    default_args=args,
    schedule_interval=None)


def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag)


# You can also access the DagRun object in templates
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 '{{ dag_run.conf["message"] if dag_run else "" }}" ',
    dag=dag)

Upvotes: 2

gruby
gruby

Reputation: 990

Yes, this can be achieved by passing None to schedule_interval in default_args.

Check this documation on DAG run.

For example:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 12, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'schedule_interval': None, # Check this line 
}

Upvotes: 2

Gregory Arenius
Gregory Arenius

Reputation: 3214

Just set the schedule_interval to None when you create the DAG:

dag = DAG('workflow_name',
          template_searchpath='path',
          schedule_interval=None,
          default_args=default_args)

From the Airflow Manual:

Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG arguments, and receives preferably a cron expression as a str, or a datetime.timedelta object.

The manual then goes on to list some cron 'presets' one of which is None.

Upvotes: 4

Related Questions