Reputation: 2411
I have an existing data pipeline in Airflow (through Google Cloud Composer) in which I execute Python and SQL-based tasks. Now, I am working on implementing it into Dataform (Google Cloud-version - not Dataform.co, if that matters).
When I use my SQL-based tasks I can easily "catch" and use the parameters with {{params.variable}}
. For example if I pass the variables country
and city
from Airflow, I currently do this in the SQL-file called by the Airflow DAG:
SELECT id, name FROM my_{{params.country}}_dataset.abc123_{{params.city}}_table
in order to be able to run the same SQL-script for several countries and cities in Airflow.
What is the equivalence in Dataform / sqlx? I preferrably want to use this is in the source/config in which I define my data sources, in Dataform.
Upvotes: 1
Views: 2178
Reputation: 1
I've tried the method provided by Lucas. It worked, but I had to change compilation_result
to
compilation_result = {
"code_compilation_config": {
"default_schema": DEFAULT_SCHEMA,
"vars": {"country": some_country, "city": some_city},
},
}
BTW, my version of google provider is apache-airflow-providers-google==8.9.0
Upvotes: 0
Reputation: 36
To execute your dataform code, you first need to compile it. Fortunately you can do it through Airflow using the DataformCreateCompilationResultOperator
. This Airflow Operator take a parameter called compilation_result
. This parameter is where you can pass your variables (those from the dataform.json
).
Below an example :
compilation_result = {
"code_compilation_config": {"default_schema": DEFAULT_SCHEMA},
"vars": {
"country": some_country,
"city": some_city
}
}
DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=DATAFORM_REGION,
repository_id=DATAFORM_REPOSITORY_ID,
compilation_result=compilation_result,
start_date=days_ago(1),
timeout=None
)
Then you can use the DataformCreateWorkflowInvocationOperator
to execute you workflow :
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=DATAFORM_REGION,
repository_id=DATAFORM_REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
start_date=days_ago(1),
timeout=None
)
Upvotes: 1
Reputation: 5110
You can define your query as:
SELECT id, name FROM my_${dataform.projectConfig.vars.country}_dataset.abc123_{dataform.projectConfig.vars.city}_table
Then you can provide the variables default values in the dataform.json
file:
{
...
"vars": {
"country": "some_country",
"city": "some_city",
},
...
}
and override them using dataform CLI:
dataform run --vars=country=another_country,city=another_city
Upvotes: 1