David Schuler
David Schuler

Reputation: 1031

Airflow: Set custom run_id for TriggerDagRunOperator

When using TriggerDagRunOperator to trigger another DAG, it just gives a generic name like trig_timestamp:

enter image description here

Is it possible to give this run id a meaningful name so I can easily identify different dag runs?

Upvotes: 5

Views: 6003

Answers (2)

Murthy Avanithsa
Murthy Avanithsa

Reputation: 31

One of the other options is REST API with apache >1.10

/api/experimental/dags/<DAG_ID>/dag_runs
{
    "conf":{"customer_id":"ABCDEF6000", "trans_id":"AN6000"},
    "run_id":"5dd89388-642a-4bf2-8776-d7a5284ee0d0"
}   

Upvotes: 3

PirateNinjas
PirateNinjas

Reputation: 2086

You can't immediately do this with the TriggerDagOperator as the "run_id" is generated inside it's execute method. However, you could implement your own operator, CustomTriggerDagOperator that would behave the way you want/need. For example:

class CustomTriggerDagOperator(TriggerDagOperator):
    def execute(self, context):
        if self.execution_date is not None:
            run_id = 'trig__{}'.format(self.execution_date)
            self.execution_date = timezone.parse(self.execution_date)
        else:
            run_id = 'trig__' + timezone.utcnow().isoformat()

        run_id += f'{self.trigger_dag_id}'

        dro = DagRunOrder(run_id=run_id)
        if self.python_callable is not None:
            dro = self.python_callable(context, dro)
        if dro:
            trigger_dag(dag_id=self.trigger_dag_id,
                        run_id=dro.run_id,
                        conf=json.dumps(dro.payload),
                        execution_date=self.execution_date,
                        replace_microseconds=False)
        else:
            self.log.info("Criteria not met, moving on")

This example above just appends the id of the triggered dag. You could use this same strategy to set the run_id arbitrarily.

Upvotes: 5

Related Questions