Reputation: 265
I have a Airflow DAG which is Scheduled to run 10 am @daily. I want to control my DAG trigger such that, if its not scheduled between 10:00 am to 10:30 am , it wont schedule it.
LatestOnlyOperator cant Help since, execution duration is for the day. the moment i enable scheduling on it , DAG will be triggered
Turns out instead of creating a window of 30 mins , My Flow requires a dependency on external DAG which is triggered manually.
let Scheduled dag - DAG1 , Manual Trigger dag - DAG2
Such that if DAG2 is triggered for the day. DAG1 should check for last DAG2 instance state for the execution date(today).
if Success skip Rest of task's
if NotExecuted or Failed , Run the remaining tasks
i have created a branch operator to do the following. but i am not able to fetch last instance of task1 from DAG2. not able to correctly determine execution_date for task1.
branching = BranchPythonOperator(
task_id='branching',
python_callable=branchTo,
dag=dag)
def branchTo(**kwargs):
date = kwargs['execution_date'] - timedelta(hours=-1)
dag_folder = conf.get('core', 'DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags['DAG2']
my_task = check_dag.get_task('task1')
ti = TaskInstance(my_task, date)
state = ti.current_state()
if state != 'success':
return 'procees_with_remaining_tasks'
else:
return 'mark_success'
Since its hard to get manually triggered task instance. i am querying for a time range to fetch all DAG excueted for given 'dag_id'
and using the output for conditional Jump
@provide_session
def get_task_instances(dag_id, task_id, start_date=None, end_date=None, session=None):
TI = TaskInstance
end_date = end_date or datetime.utcnow()
list = session.query(TI).filter(
TI.dag_id == dag_id,
TI.task_id == task_id,
TI.execution_date >= start_date,
TI.execution_date <= end_date,
).order_by(TI.execution_date.desc()).all()
return list
Upvotes: 4
Views: 3410
Reputation: 15931
You can use BranchDateTimeOperator at the beginning of your workflow to verify what is the current time and according to it decide if you continue to execute the workflow or branch to end task.
from airflow.operators.datetime import BranchDateTimeOperator
cond = BranchDateTimeOperator(
task_id='datetime_branch',
follow_task_ids_if_true=['continue_task'],
follow_task_ids_if_false=['end_task'],
target_upper=datetime.time(10, 30, 0),
target_lower=datetime.time(10, 0, 0),
dag=dag,
)
Upvotes: 4