Reputation: 520
I want to clear the tasks in DAG B when DAG A completes execution. Both A and B are scheduled DAGs.
Is there any operator
/way to clear the state of tasks and re-run DAG B programmatically?
I'm aware of the CLI option and Web UI option to clear the tasks.
Upvotes: 3
Views: 13574
Reputation: 785
It is possible but I would be careful about getting into an endless loop of retries if the task never succeeds. You can call a bash command within the on_retry_callback where you can specify which tasks/dag runs you want to clear.
This works in 2.0 as the clear commands have changed
https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clear
In this example, I am clearing from t2 & downstream tasks when t3 eventually fails:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
def clear_upstream_task(context):
execution_date = context.get("execution_date")
clear_tasks = BashOperator(
task_id='clear_tasks',
bash_command=f'airflow tasks clear -s {execution_date} -t t2 -d -y clear_upstream_task'
)
return clear_tasks.execute(context=context)
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
with DAG('clear_upstream_task',
start_date=datetime(2021, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=5),
default_args=default_args,
catchup=False
) as dag:
t0 = DummyOperator(
task_id='t0'
)
t1 = DummyOperator(
task_id='t1'
)
t2 = DummyOperator(
task_id='t2'
)
t3 = BashOperator(
task_id='t3',
bash_command='exit 123',
#retries=1,
on_failure_callback=clear_upstream_task
)
t0 >> t1 >> t2 >> t3
Upvotes: 1
Reputation: 526
I would recommend staying away from CLI here!
The airflow functionality of dags/tasks are much better exposed when referencing the objects, as compared to going through BashOperator and/or CLI module.
Add a python operation to dag A named "clear_dag_b", that imports dag_b from the dags folder(module) and this:
from dags.dag_b import dag as dag_b
def clear_dag_b(**context):
exec_date = context[some date object, I forget the name]
dag_b.clear(start_date=exec_date, end_date=exec_date)
Important! If you for some reason do not match or overlap the dag_b schedule time with start_date/end_date, the clear() operation will miss the dag executions. This example assumes dag A and B are scheduled identical, and that you only want to clear day X from B, when A executes day X
It might make sense to include a check for whether the dag_b has already run or not, before clearing:
dab_b_run = dag_b.get_dagrun(exec_date) # returns None or a dag_run object
Upvotes: 3
Reputation: 520
Since my objective was to re-run the DAG B whenever DAG A completes execution, i ended up clearing the DAG B using BashOperator:
# Clear the tasks in another dag
last_task = BashOperator(
task_id='last_task',
bash_command= 'airflow clear example_target_dag -c ',
dag=dag)
first_task >> last_task
Upvotes: 2
Reputation: 11607
cli.py
is an incredibly useful place to peep into SQLAlchemy
magic of Airflow
.clear
command is implemented here@cli_utils.action_logging def clear(args): logging.basicConfig( level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) dags = get_dags(args) if args.task_regex: for idx, dag in enumerate(dags): dags[idx] = dag.sub_dag( task_regex=args.task_regex, include_downstream=args.downstream, include_upstream=args.upstream) DAG.clear_dags( dags, start_date=args.start_date, end_date=args.end_date, only_failed=args.only_failed, only_running=args.only_running, confirm_prompt=not args.no_confirm, include_subdags=not args.exclude_subdags, include_parentdag=not args.exclude_parentdag, )
from airflow.bin import cli
and invoke the required functions directlyUpvotes: 2