Reputation: 1424
I have a DAG with one DataflowTemplateOperator
that can deal with different json files. When I trigger the dag I pass some parameters via {{dag_run.conf['param1']}}
and works fine.
The issue I have is trying to rename the task_id
based on param1.
i.e. task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",
it complains about only alphanumeric characters or
task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']),
it does not recognise dag_run plus the alpha issue.
The whole idea behind this is that when I see at the dataflow jobs console and job has failed I know who the offender is based on param1. Dataflow Job names are based on task_id like this:
df-operator-read-object-json-file-8b9eecec
and what I need is this:
df-operator-read-object-param1-json-file-8b9eecec
Any ideas if this is possible?
Upvotes: 2
Views: 1520
Reputation: 15979
There is no need to generate new operator per file.
DataflowTemplatedJobStartOperator
has job_name
parameter which is also templated so can be used with Jinja.
I didn't test it but this should work:
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
task_id="df_operator_read_object_json_file",
job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
template='gs://dataflow-templates/your_template',
location='europe-west3',
)
Upvotes: 2