Blessy
Blessy

Reputation: 520

Programmatically clear the state of airflow task instances

I want to clear the tasks in DAG B when DAG A completes execution. Both A and B are scheduled DAGs.

Is there any operator/way to clear the state of tasks and re-run DAG B programmatically?


I'm aware of the CLI option and Web UI option to clear the tasks.

Upvotes: 3

Views: 13574

Answers (4)

phenderbender
phenderbender

Reputation: 785

It is possible but I would be careful about getting into an endless loop of retries if the task never succeeds. You can call a bash command within the on_retry_callback where you can specify which tasks/dag runs you want to clear.

This works in 2.0 as the clear commands have changed

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clear

In this example, I am clearing from t2 & downstream tasks when t3 eventually fails:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


def clear_upstream_task(context):
    execution_date = context.get("execution_date")
    clear_tasks = BashOperator(
        task_id='clear_tasks',
        bash_command=f'airflow tasks clear -s {execution_date}  -t t2 -d -y clear_upstream_task'
    )
    return clear_tasks.execute(context=context)


# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}


with DAG('clear_upstream_task',
         start_date=datetime(2021, 1, 1),
         max_active_runs=3,
         schedule_interval=timedelta(minutes=5),
         default_args=default_args,
         catchup=False
         ) as dag:
    t0 = DummyOperator(
        task_id='t0'
    )

    t1 = DummyOperator(
        task_id='t1'
    )

    t2 = DummyOperator(
        task_id='t2'
    )
    t3 = BashOperator(
        task_id='t3',
        bash_command='exit 123',
        #retries=1,
        on_failure_callback=clear_upstream_task
    )

    t0 >> t1 >> t2 >> t3

Upvotes: 1

Mathias Andersen
Mathias Andersen

Reputation: 526

I would recommend staying away from CLI here!

The airflow functionality of dags/tasks are much better exposed when referencing the objects, as compared to going through BashOperator and/or CLI module.

Add a python operation to dag A named "clear_dag_b", that imports dag_b from the dags folder(module) and this:

from dags.dag_b import dag as dag_b

def clear_dag_b(**context):
   exec_date = context[some date object, I forget the name]
   dag_b.clear(start_date=exec_date, end_date=exec_date) 

Important! If you for some reason do not match or overlap the dag_b schedule time with start_date/end_date, the clear() operation will miss the dag executions. This example assumes dag A and B are scheduled identical, and that you only want to clear day X from B, when A executes day X

It might make sense to include a check for whether the dag_b has already run or not, before clearing:

dab_b_run = dag_b.get_dagrun(exec_date) # returns None or a dag_run object

Upvotes: 3

Blessy
Blessy

Reputation: 520

Since my objective was to re-run the DAG B whenever DAG A completes execution, i ended up clearing the DAG B using BashOperator:

# Clear the tasks in another dag
last_task = BashOperator(
    task_id='last_task',
    bash_command= 'airflow clear example_target_dag -c ',
    dag=dag)

first_task >> last_task

Upvotes: 2

y2k-shubham
y2k-shubham

Reputation: 11607

  • cli.py is an incredibly useful place to peep into SQLAlchemy magic of Airflow.
  • The clear command is implemented here
@cli_utils.action_logging
def clear(args):
    logging.basicConfig(
        level=settings.LOGGING_LEVEL,
        format=settings.SIMPLE_LOG_FORMAT)
    dags = get_dags(args)

    if args.task_regex:
        for idx, dag in enumerate(dags):
            dags[idx] = dag.sub_dag(
                task_regex=args.task_regex,
                include_downstream=args.downstream,
                include_upstream=args.upstream)

    DAG.clear_dags(
        dags,
        start_date=args.start_date,
        end_date=args.end_date,
        only_failed=args.only_failed,
        only_running=args.only_running,
        confirm_prompt=not args.no_confirm,
        include_subdags=not args.exclude_subdags,
        include_parentdag=not args.exclude_parentdag,
    )
  • Looking at the source, you can either
    • replicate it (assuming you also want to modify the functionality a bit)
    • or maybe just do from airflow.bin import cli and invoke the required functions directly

Upvotes: 2

Related Questions