Cdr
Cdr

Reputation: 581

How to re-rerun the dag which was already executed using TriggerDagrunoperator?

I have a dag,where i'm using list of below operators

My use case: Say for instance, if entire flow completed successfully, and i found some issue with processing of data in between. I want to re-run the job for that particular execution date from point of issue. I cleared the downstream and it makes the job to re-run. But, TriggerDagrunoperator fails with below issue.

airflow.exceptions.DagRunAlreadyExists: Run id triggered_ : already exists for dag id

I want to clear that and need to re-run the dag again for that particular execution date. Any better way to accomplish that?

Upvotes: 3

Views: 10405

Answers (4)

Zerzavot
Zerzavot

Reputation: 362

You can use reset_dag_run

trigger = TriggerDagRunOperator(
                task_id="trigger",
                trigger_dag_id=dag_id_to_trigger,
                execution_date=trigger_date,
                reset_dag_run = True
            )

Whether clear existing dag run if already exists. This is useful when backfill or rerun an existing dag run. This only resets (not recreates) the dag run. Dag run conf is immutable and will not be reset on rerun of an existing dag run. When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun.

Upvotes: 1

Marc Lamberti
Marc Lamberti

Reputation: 821

Airflow 2.0 brings a new version of the TriggerDagRunOperator allowing to rerun past DAGRuns. It's much more easier now than before! The parameter reset_dag_runs is exactly what you're looking for.
I've made a video about it if that can help you https://youtu.be/8uKW0mPWmCk.
Have a good day!

Upvotes: 7

Scott G
Scott G

Reputation: 1

I know it's been over a year, but an easier way to do this that doesn't require polluting your dag with a PythonBranchOperator and DummyOperator would be to use the python_callable parameter to TriggerDagRunOperator, and put the logic inside that function.

For example:

    def conditionally_trigger(context, dro):
        dag_to_clear = your_triggered_dag
        child_dag = DagBag().get_dag(dag_to_clear) 
        previous_dagrun = DagRun.find(dag_to_clear,                                     
                      execution_date=context['execution_date'])
        if previous_dagrun:
            child_dag.clear(start_date=execution_date,
                            end_date=execution_date)
            return
        else:
            dro.run_id = 'First_time_triggered'
            return dro

    trigger = TriggerDagRunOperator(
        task_id='trigger',
        dag=trigger_dag,
        python_callable=conditionally_trigger,
        trigger_dag_id=dag_to_rerun.dag_id,
    )

Upvotes: 0

gecko655
gecko655

Reputation: 352

Use following steps:

  1. Determine whether we need to delete previous DagRun with same execution_date with DagRun.find()
  2. Branch...
    1. If there is DagRun executed, delete DagRun from Db by dag.clear(). We don't need to run DagRun because cleared DagRun will re-run automatically.
    2. Otherwise, trigger DagRun.
dag_to_rerun = ...
trigger_dag = ...

def delete_previous_dagrun_func(dag_to_delete: airflow.DAG, **context):
    execution_date = context.get('execution_date')
    previous_dagruns = DagRun.find(dag_id=dag_to_delete.dag_id,
                                   execution_date=execution_date)
    if previous_dagruns:
        dag_to_delete.clear(
            start_date=execution_date,
            end_date=execution_date,
        )
        return 'no_op'
    else:
        return 'trigger'

delete_previous_dagrun = PythonBranchOperator(
    task_id='delete_previous_dagrun',
    dag=trigger_dag,
    python_callable=delete_previous_dagrun_func,
    op_args=(dag_to_rerun,),
    provide_context=True,
)
trigger = TriggerDagRunOperator(
    task_id='trigger',
    dag=trigger_dag,
    trigger_dag_id=dag_to_rerun.dag_id,
)
no_op = DummyOperator(
    task_id='no_op',
    dag=trigger_dag,
)
delete_previous_dagrun >> [trigger, no_op]

Upvotes: 3

Related Questions