Reputation: 611
I have this simple Airflow DAG:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
with DAG("Second Dag",
start_date=datetime(2022,1,1),
schedule_interval="0 5 * * *",
catchup=False,
max_active_runs=1
) as dag:
task_a = BashOperator(
task_id="ToRepeat",
bash_command="cd /home/xdf/local/ && (env/bin/python workflow/test1.py)",
retries =1,
)
The task takes a variable amount of time between one run and the other, and I don't have any guarantee that it will be finished within the 5 A.M of the next day. If the task is still running when a new task is scheduled to start, I need to kill the old one before it starts running.
How can I design Airflow DAG to automatically kill the old task if it's still running when a new task is scheduled to start?
More details: I am looking for something dynamic. The old DAG should be killed only when the new DAG is starting. If, for any reason, the new DAG does not start for one week, then old DAG should be able to run for an entire week. That's why using a timeout is sub-optimal
Upvotes: 2
Views: 1757
Reputation: 254
If you really are looking for a dynamic solution; you can take help of Airflow DAGRun APIs and Xcoms; you can push your current dag run_id to Xcom and for subsequent runs you can pull this Xcom to consume with airflow API to check and kill the dag run with that run_id.
check_previous_dag_run_id >> kill_previous_dag_run >> push_current_run_id >> your_main_task
and your API call task should be something like
...
kill_previous_dag_run = BashOperator(
task_id="kill_previous_dag_run",
bash_command="curl -X 'DELETE' \
'http://<<your_webserver_dns>>/api/v1/dags/<<your_dag_name>>/dagRuns/<<url_encoded_run_id>>' \
-H 'accept: */*' --user <<api_username>>:<<api_user_password>>",
dag=dag
)
...
Upvotes: 0
Reputation: 15931
You should set dagrun_timeout
for your DAG.
dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created. The timeout
is only enforced for scheduled DagRuns.
Since your DAG runs daily you can set 24 hours for timeout.
with DAG("Second Dag",
start_date=datetime(2022,1,1),
schedule_interval="0 5 * * *",
catchup=False,
max_active_runs=1
dagrun_timeout=timedelta(hours=24)
) as dag:
If you want to set timeout on a specific task in your DAG you should use execution_timeout
on your operator.
execution_timeout: max time allowed for the execution of this task instance, if it goes beyond it will raise and fail
Example:
MyOperator(task_id='task', execution_timeout=timedelta(hours=24))
Upvotes: 2