Altons
Altons

Reputation: 1424

dynamic task id names in Airflow

I have a DAG with one DataflowTemplateOperator that can deal with different json files. When I trigger the dag I pass some parameters via {{dag_run.conf['param1']}} and works fine.

The issue I have is trying to rename the task_id based on param1.

i.e. task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",

it complains about only alphanumeric characters or

task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']), it does not recognise dag_run plus the alpha issue.

The whole idea behind this is that when I see at the dataflow jobs console and job has failed I know who the offender is based on param1. Dataflow Job names are based on task_id like this:

df-operator-read-object-json-file-8b9eecec

and what I need is this:

df-operator-read-object-param1-json-file-8b9eecec

Any ideas if this is possible?

Upvotes: 2

Views: 1520

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15979

There is no need to generate new operator per file. DataflowTemplatedJobStartOperator has job_name parameter which is also templated so can be used with Jinja.

I didn't test it but this should work:

from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
        task_id="df_operator_read_object_json_file",
        job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
        template='gs://dataflow-templates/your_template',
        location='europe-west3',
    )

Upvotes: 2

Related Questions