Tommaso Bendinelli
Tommaso Bendinelli

Reputation: 611

How to force a Airflow Task to restart at the new scheduling date?

I have this simple Airflow DAG:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
with DAG("Second Dag", 
        start_date=datetime(2022,1,1), 
        schedule_interval="0 5 * * *", 
        catchup=False,
        max_active_runs=1
        ) as dag:

    task_a = BashOperator(
        task_id="ToRepeat",
        bash_command="cd /home/xdf/local/ && (env/bin/python workflow/test1.py)",
        retries =1,
    )

The task takes a variable amount of time between one run and the other, and I don't have any guarantee that it will be finished within the 5 A.M of the next day. If the task is still running when a new task is scheduled to start, I need to kill the old one before it starts running.

How can I design Airflow DAG to automatically kill the old task if it's still running when a new task is scheduled to start?

More details: I am looking for something dynamic. The old DAG should be killed only when the new DAG is starting. If, for any reason, the new DAG does not start for one week, then old DAG should be able to run for an entire week. That's why using a timeout is sub-optimal

Upvotes: 2

Views: 1757

Answers (2)

Kaish kugashia
Kaish kugashia

Reputation: 254

If you really are looking for a dynamic solution; you can take help of Airflow DAGRun APIs and Xcoms; you can push your current dag run_id to Xcom and for subsequent runs you can pull this Xcom to consume with airflow API to check and kill the dag run with that run_id.

check_previous_dag_run_id >> kill_previous_dag_run >> push_current_run_id >> your_main_task

and your API call task should be something like

...
kill_previous_dag_run = BashOperator(
    task_id="kill_previous_dag_run",
    bash_command="curl -X 'DELETE' \
        'http://<<your_webserver_dns>>/api/v1/dags/<<your_dag_name>>/dagRuns/<<url_encoded_run_id>>' \
        -H 'accept: */*' --user <<api_username>>:<<api_user_password>>",
    dag=dag
)
...

Upvotes: 0

Elad Kalif
Elad Kalif

Reputation: 15931

You should set dagrun_timeout for your DAG.

dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created. The timeout
is only enforced for scheduled DagRuns.

Since your DAG runs daily you can set 24 hours for timeout.

with DAG("Second Dag", 
        start_date=datetime(2022,1,1), 
        schedule_interval="0 5 * * *", 
        catchup=False,
        max_active_runs=1
        dagrun_timeout=timedelta(hours=24)
        ) as dag:

If you want to set timeout on a specific task in your DAG you should use execution_timeout on your operator.

execution_timeout: max time allowed for the execution of this task instance, if it goes beyond it will raise and fail

Example:

MyOperator(task_id='task', execution_timeout=timedelta(hours=24))

Upvotes: 2

Related Questions