A gupta
A gupta

Reputation: 39

RunID is coming as manual even if the DAG is being scheduled using TriggerDagRunOperator

I am triggering my DAG automatically using scheduled time and TriggerDagRunOperator, but the run_id is coming as manual_(time). I want that the run_id to come as scheduled or something but not manual to distinguish it with manually triggered DAG. I am using Airflow 2.

Issue: TriggerDagOperator also generates a run_id inside it's execute method, right? So, we are using that run_id for our pipeline. The problem is that run_id comes as manual_ and then timestamp. I want this manual should get replace with triggered or something scheduled.

Upvotes: 0

Views: 505

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

I share with you a way to override the TriggerDagRunOperator operator. In this case, we can interact with the current context, apply a logic based on the current run_id and override the run_id param with new calculated field in the operator, the code should look like :

from __future__ import annotations

import datetime
from typing import List, Dict, Optional, Union

from airflow.operators.trigger_dagrun import TriggerDagRunOperator


class CustomTriggerDagRunOperator(TriggerDagRunOperator):

    def __init__(self,
                 trigger_dag_id: str,
                 trigger_run_id: Optional[str] = None,
                 conf: Optional[Dict] = None,
                 execution_date: Optional[Union[str, datetime.datetime]] = None,
                 reset_dag_run: bool = False,
                 wait_for_completion: bool = False,
                 poke_interval: int = 60,
                 allowed_states: Optional[List] = None,
                 failed_states: Optional[List] = None,
                 **kwargs) -> None:
        super(CustomTriggerDagRunOperator, self).__init__(
            trigger_dag_id=trigger_dag_id,
            trigger_run_id=trigger_run_id,
            conf=conf,
            execution_date=execution_date,
            reset_dag_run=reset_dag_run,
            wait_for_completion=wait_for_completion,
            poke_interval=poke_interval,
            allowed_states=allowed_states,
            failed_states=failed_states,
            **kwargs
        )

   def execute(self, context):
      current_trigger_run_id = self.trigger_run_id

      # Apply your logic.
      self.trigger_run_id = ...

      super(CustomTriggerDagRunOperator, self).execute(context)
  • Create a class CustomTriggerDagRunOperator that extends TriggerDagRunOperator operator.
  • Override the constructor of the operator.
  • Apply your logic based on the trigger_run_id field.
  • Call the execute method on the parent operator.

Upvotes: 2

Related Questions