Reputation: 918
I need to have pipeline that will be executed either manually or programmatically, is possible with Airflow? Looks like right now each workflow MUST be tied to a schedule.
Upvotes: 3
Views: 1150
Reputation: 13036
In Airflow, every DAG is required to have a start date and schedule interval*, for example hourly:
import datetime
dag = DAG(
dag_id='my_dag',
schedule_interval=datetime.timedelta(hours=1),
start_date=datetime(2018, 5, 23),
)
(Without a schedule how would it know when to run?)
Alternatively to a cron schedule, you can set the schedule to @once
to only run once.
*One exception: You can omit the schedule for externally triggered DAGs because Airflow will not schedule them itself.
However, that said, if you omit the schedule, then you need to trigger the DAG externally somehow. If you want to be able to call a DAG programmatically, for instance, as a result of a separate condition occurring in another DAG, you can do that with the TriggerDagRunOperator. You might also hear this idea called externally triggered DAGs.
Here's a usage example from the Airflow Example DAGs:
File 1 - example_trigger_controller_dag.py:
"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py)
This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
a. A python callable that decides whether or not to trigger the Target DAG
b. An optional params dict passed to the python callable to help in
evaluating whether or not to trigger the Target DAG
c. The id (name) of the Target DAG
d. The python callable can add contextual info to the DagRun created by
way of adding a Pickleable payload (e.g. dictionary of primitives). This
state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime
import pprint
pp = pprint.PrettyPrinter(indent=4)
def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
c_p = context['params']['condition_param']
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
# Define the DAG
dag = DAG(dag_id='example_trigger_controller_dag',
default_args={"owner": "airflow",
"start_date": datetime.utcnow()},
schedule_interval='@once')
# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
trigger_dag_id="example_trigger_target_dag",
python_callable=conditionally_trigger,
params={'condition_param': True,
'message': 'Hello World'},
dag=dag)
File 2 - example_trigger_target_dag.py:
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime
import pprint
pp = pprint.PrettyPrinter(indent=4)
# This example illustrates the use of the TriggerDagRunOperator. There are 2
# entities at work in this scenario:
# 1. The Controller DAG - the DAG that conditionally executes the trigger
# (in example_trigger_controller.py)
# 2. The Target DAG - DAG being triggered
#
# This example illustrates the following features :
# 1. A TriggerDagRunOperator that takes:
# a. A python callable that decides whether or not to trigger the Target DAG
# b. An optional params dict passed to the python callable to help in
# evaluating whether or not to trigger the Target DAG
# c. The id (name) of the Target DAG
# d. The python callable can add contextual info to the DagRun created by
# way of adding a Pickleable payload (e.g. dictionary of primitives). This
# state is then made available to the TargetDag
# 2. A Target DAG : c.f. example_trigger_target_dag.py
args = {
'start_date': datetime.utcnow(),
'owner': 'airflow',
}
dag = DAG(
dag_id='example_trigger_target_dag',
default_args=args,
schedule_interval=None)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag)
# You can also access the DagRun object in templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: '
'{{ dag_run.conf["message"] if dag_run else "" }}" ',
dag=dag)
Upvotes: 2
Reputation: 990
Yes, this can be achieved by passing None
to schedule_interval
in default_args
.
Check this documation on DAG run.
For example:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': None, # Check this line
}
Upvotes: 2
Reputation: 3214
Just set the schedule_interval
to None
when you create the DAG:
dag = DAG('workflow_name',
template_searchpath='path',
schedule_interval=None,
default_args=default_args)
From the Airflow Manual:
Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG arguments, and receives preferably a cron expression as a str, or a datetime.timedelta object.
The manual then goes on to list some cron 'presets' one of which is None
.
Upvotes: 4