gorros
gorros

Reputation: 1461

How to retry complete Airflow DAG?

I know that it is possible to retry individual tasks, but is it possible to retry complete DAG?

I create tasks dynamically, that is why I need to retry not specific task, but complete DAG. If it is not supported by Airflow, maybe there is some workaround.

Upvotes: 3

Views: 1476

Answers (3)

Hemanth Gowda
Hemanth Gowda

Reputation: 620

I wrote the below script and scheduled it on airflow master to rerun the failed DAG runs for DAGs mentioned in "dag_ids_to_monitor" array

import subprocess
import re
from datetime import datetime

dag_ids_to_monitor = ['dag1','dag2','dag2']



def runBash(cmd):
    print ("running bash command {}".format(cmd))
    output = subprocess.check_output(cmd.split())
    return output


def datetime_valid(dt_str):
    try:
        datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S')
        print(dt_str)
        print(datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S'))
    except:
        return False
    return True


def get_schedules_to_rerun(dag_id):
    bashCommand = f"airflow list_dag_runs --state failed {dag_id}"
    output = runBash(bashCommand)

    schedules_to_rerun = []
    for line in output.split('\n'):
        parts = re.split("\s*\|\s*", line)
        if len(parts) > 4 and datetime_valid(parts[3][:-6]):
            schedules_to_rerun.append(parts[3])
    return schedules_to_rerun


def trigger_runs(dag_id, re_run_start_times):
    for start_time in re_run_start_times:
        runBash(f"airflow clear --no_confirm --start_date {start_time} --end_date {start_time} {dag_id}")


def rerun_failed_dag_runs(dag_id):
    re_run_start_times = get_schedules_to_rerun(dag_id)
    trigger_runs(dag_id,re_run_start_times)


for dag_id in dag_ids_to_monitor:
    rerun_failed_dag_runs(dag_id)

Upvotes: 1

Artem Vovsia
Artem Vovsia

Reputation: 1570

Go to Airflow UI, click on the first task(s) of your DAG, to the right of the "Clear" button choose "Downstream" and "Recursive" and after that press "Clear". This will mark the DAG as "Haven't yet run" and rerun it if the DAG schedule permits it

Upvotes: 0

Meghdeep Ray
Meghdeep Ray

Reputation: 5537

If you have access to the Airflow UI, go to Graph view.

In graph view, individual tasks are marked as boxes and the DAG run as a whole is indicated by circles. Click on a circle and then the clear option. This will restart the entire run.

Alternatively you can go to the tree view and clear the first task in the DAG.

Upvotes: 0

Related Questions