Reputation: 1344
I have this dag code below.
import pendulum
from airflow import DAG
from airflow.decorators import dag, task
from custom_operators.profile_data_and_update_test_suite_operator import ProfileDataAndUpdateTestSuiteOperator
from custom_operators.validate_data_operator import ValidateDataOperator
from airflow.models import Variable
connstring = Variable.get("SECRET_SNOWFLAKE_DEV_CONNECTION_STRING")
@dag('profile_and_validate_data', schedule_interval=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False)
def taskflow():
profile_data = ProfileDataAndUpdateTestSuiteOperator(
task_id="profile_data",
asset_name="{{ dag_run.conf['asset_name'] }}",
data_format="sql",
connection_string=connstring
)
validate_data = ValidateDataOperator(
task_id="validate_data",
asset_name="{{ dag_run.conf['asset_name'] }}",
data_format="sql",
connection_string=connstring,
trigger_rule="all_done"
)
profile_data >> validate_data
dag = taskflow()
But the asset_name parameter is showing up the raw string of "{{ dag_run.conf['asset_name'] }}" rather than the configuration that is parsed when you trigger the dag and parsed with jinja.
What am I doing wrong here?
Upvotes: 0
Views: 2476
Reputation: 3661
BaseOperator has a field "template_fields" that contains all the field name that during the run Airflow would replace it values according to Jinja template.
You need to specify in your Custom Operators (ProfileDataAndUpdateTestSuiteOperator, ValidateDataOperator) the field "asset_name"
template_fields: Sequence[str] = (asset_name, )
Upvotes: 2
Reputation: 1
render_template_as_native_obj
is set to False
by default on the DAG. Setting it to False
returns strings, change it True
to get the native obj.
@dag('profile_and_validate_data', schedule_interval=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, render_template_as_native_obj=True)
Upvotes: 0