Reputation: 39
I am triggering my DAG automatically using scheduled time and TriggerDagRunOperator
, but the run_id
is coming as manual_(time). I want that the run_id
to come as scheduled or something but not manual to distinguish it with manually triggered DAG. I am using Airflow 2.
Issue: TriggerDagOperator
also generates a run_id
inside it's execute method, right? So, we are using that run_id
for our pipeline. The problem is that run_id
comes as manual_
and then timestamp. I want this manual
should get replace with triggered or something scheduled.
Upvotes: 0
Views: 505
Reputation: 6572
I share with you a way to override the TriggerDagRunOperator
operator.
In this case, we can interact with the current context, apply a logic based on the current run_id
and override the run_id
param with new calculated field in the operator, the code should look like :
from __future__ import annotations
import datetime
from typing import List, Dict, Optional, Union
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
class CustomTriggerDagRunOperator(TriggerDagRunOperator):
def __init__(self,
trigger_dag_id: str,
trigger_run_id: Optional[str] = None,
conf: Optional[Dict] = None,
execution_date: Optional[Union[str, datetime.datetime]] = None,
reset_dag_run: bool = False,
wait_for_completion: bool = False,
poke_interval: int = 60,
allowed_states: Optional[List] = None,
failed_states: Optional[List] = None,
**kwargs) -> None:
super(CustomTriggerDagRunOperator, self).__init__(
trigger_dag_id=trigger_dag_id,
trigger_run_id=trigger_run_id,
conf=conf,
execution_date=execution_date,
reset_dag_run=reset_dag_run,
wait_for_completion=wait_for_completion,
poke_interval=poke_interval,
allowed_states=allowed_states,
failed_states=failed_states,
**kwargs
)
def execute(self, context):
current_trigger_run_id = self.trigger_run_id
# Apply your logic.
self.trigger_run_id = ...
super(CustomTriggerDagRunOperator, self).execute(context)
CustomTriggerDagRunOperator
that extends TriggerDagRunOperator
operator.trigger_run_id
field.execute
method on the parent operator.Upvotes: 2