Lalit Lakhotia
Lalit Lakhotia

Reputation: 265

How to make airflow skip a schedule?

I have a Airflow DAG which is Scheduled to run 10 am @daily. I want to control my DAG trigger such that, if its not scheduled between 10:00 am to 10:30 am , it wont schedule it.

LatestOnlyOperator cant Help since, execution duration is for the day. the moment i enable scheduling on it , DAG will be triggered

Turns out instead of creating a window of 30 mins , My Flow requires a dependency on external DAG which is triggered manually.

let Scheduled dag - DAG1 , Manual Trigger dag - DAG2

Such that if DAG2 is triggered for the day. DAG1 should check for last DAG2 instance state for the execution date(today).

if Success skip Rest of task's

if NotExecuted or Failed , Run the remaining tasks

i have created a branch operator to do the following. but i am not able to fetch last instance of task1 from DAG2. not able to correctly determine execution_date for task1.


branching = BranchPythonOperator(
    task_id='branching',
    python_callable=branchTo,
    dag=dag)


def branchTo(**kwargs):
    date = kwargs['execution_date'] - timedelta(hours=-1)
    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags['DAG2']
    my_task = check_dag.get_task('task1')
    ti = TaskInstance(my_task, date)
    state = ti.current_state()
    if state != 'success':
        return 'procees_with_remaining_tasks'
    else:
        return 'mark_success'

Since its hard to get manually triggered task instance. i am querying for a time range to fetch all DAG excueted for given 'dag_id'

and using the output for conditional Jump

@provide_session
def get_task_instances(dag_id, task_id, start_date=None, end_date=None, session=None): 
    TI = TaskInstance
    end_date = end_date or datetime.utcnow()
    list = session.query(TI).filter(
        TI.dag_id == dag_id,
        TI.task_id == task_id,
        TI.execution_date >= start_date,
        TI.execution_date <= end_date,
    ).order_by(TI.execution_date.desc()).all()
    return list

Upvotes: 4

Views: 3410

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15931

You can use BranchDateTimeOperator at the beginning of your workflow to verify what is the current time and according to it decide if you continue to execute the workflow or branch to end task.

from airflow.operators.datetime import BranchDateTimeOperator
cond = BranchDateTimeOperator(
    task_id='datetime_branch',
    follow_task_ids_if_true=['continue_task'],
    follow_task_ids_if_false=['end_task'],
    target_upper=datetime.time(10, 30, 0),
    target_lower=datetime.time(10, 0, 0),
    dag=dag,
)

Upvotes: 4

Related Questions