Reputation: 1461
I know that it is possible to retry individual tasks, but is it possible to retry complete DAG?
I create tasks dynamically, that is why I need to retry not specific task, but complete DAG. If it is not supported by Airflow, maybe there is some workaround.
Upvotes: 3
Views: 1476
Reputation: 620
I wrote the below script and scheduled it on airflow master to rerun the failed DAG runs for DAGs mentioned in "dag_ids_to_monitor" array
import subprocess
import re
from datetime import datetime
dag_ids_to_monitor = ['dag1','dag2','dag2']
def runBash(cmd):
print ("running bash command {}".format(cmd))
output = subprocess.check_output(cmd.split())
return output
def datetime_valid(dt_str):
try:
datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S')
print(dt_str)
print(datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S'))
except:
return False
return True
def get_schedules_to_rerun(dag_id):
bashCommand = f"airflow list_dag_runs --state failed {dag_id}"
output = runBash(bashCommand)
schedules_to_rerun = []
for line in output.split('\n'):
parts = re.split("\s*\|\s*", line)
if len(parts) > 4 and datetime_valid(parts[3][:-6]):
schedules_to_rerun.append(parts[3])
return schedules_to_rerun
def trigger_runs(dag_id, re_run_start_times):
for start_time in re_run_start_times:
runBash(f"airflow clear --no_confirm --start_date {start_time} --end_date {start_time} {dag_id}")
def rerun_failed_dag_runs(dag_id):
re_run_start_times = get_schedules_to_rerun(dag_id)
trigger_runs(dag_id,re_run_start_times)
for dag_id in dag_ids_to_monitor:
rerun_failed_dag_runs(dag_id)
Upvotes: 1
Reputation: 1570
Go to Airflow UI, click on the first task(s) of your DAG, to the right of the "Clear" button choose "Downstream" and "Recursive" and after that press "Clear". This will mark the DAG as "Haven't yet run" and rerun it if the DAG schedule permits it
Upvotes: 0
Reputation: 5537
If you have access to the Airflow UI, go to Graph view.
In graph view, individual tasks are marked as boxes and the DAG run as a whole is indicated by circles. Click on a circle and then the clear
option. This will restart the entire run.
Alternatively you can go to the tree view and clear
the first task in the DAG.
Upvotes: 0