Reputation: 581
I have a dag,where i'm using list of below operators
My use case: Say for instance, if entire flow completed successfully, and i found some issue with processing of data in between. I want to re-run the job for that particular execution date from point of issue. I cleared the downstream and it makes the job to re-run. But, TriggerDagrunoperator fails with below issue.
airflow.exceptions.DagRunAlreadyExists: Run id triggered_ : already exists for dag id
I want to clear that and need to re-run the dag again for that particular execution date. Any better way to accomplish that?
Upvotes: 3
Views: 10405
Reputation: 362
You can use reset_dag_run
trigger = TriggerDagRunOperator(
task_id="trigger",
trigger_dag_id=dag_id_to_trigger,
execution_date=trigger_date,
reset_dag_run = True
)
Whether clear existing dag run if already exists. This is useful when backfill or rerun an existing dag run. This only resets (not recreates) the dag run. Dag run conf is immutable and will not be reset on rerun of an existing dag run. When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun.
Upvotes: 1
Reputation: 821
Airflow 2.0 brings a new version of the TriggerDagRunOperator allowing to rerun past DAGRuns. It's much more easier now than before!
The parameter reset_dag_runs is exactly what you're looking for.
I've made a video about it if that can help you https://youtu.be/8uKW0mPWmCk.
Have a good day!
Upvotes: 7
Reputation: 1
I know it's been over a year, but an easier way to do this that doesn't require polluting your dag with a PythonBranchOperator and DummyOperator would be to use the python_callable parameter to TriggerDagRunOperator, and put the logic inside that function.
For example:
def conditionally_trigger(context, dro):
dag_to_clear = your_triggered_dag
child_dag = DagBag().get_dag(dag_to_clear)
previous_dagrun = DagRun.find(dag_to_clear,
execution_date=context['execution_date'])
if previous_dagrun:
child_dag.clear(start_date=execution_date,
end_date=execution_date)
return
else:
dro.run_id = 'First_time_triggered'
return dro
trigger = TriggerDagRunOperator(
task_id='trigger',
dag=trigger_dag,
python_callable=conditionally_trigger,
trigger_dag_id=dag_to_rerun.dag_id,
)
Upvotes: 0
Reputation: 352
Use following steps:
DagRun.find()
dag.clear()
. We don't need to run DagRun because cleared DagRun will re-run automatically.dag_to_rerun = ...
trigger_dag = ...
def delete_previous_dagrun_func(dag_to_delete: airflow.DAG, **context):
execution_date = context.get('execution_date')
previous_dagruns = DagRun.find(dag_id=dag_to_delete.dag_id,
execution_date=execution_date)
if previous_dagruns:
dag_to_delete.clear(
start_date=execution_date,
end_date=execution_date,
)
return 'no_op'
else:
return 'trigger'
delete_previous_dagrun = PythonBranchOperator(
task_id='delete_previous_dagrun',
dag=trigger_dag,
python_callable=delete_previous_dagrun_func,
op_args=(dag_to_rerun,),
provide_context=True,
)
trigger = TriggerDagRunOperator(
task_id='trigger',
dag=trigger_dag,
trigger_dag_id=dag_to_rerun.dag_id,
)
no_op = DummyOperator(
task_id='no_op',
dag=trigger_dag,
)
delete_previous_dagrun >> [trigger, no_op]
Upvotes: 3