Christoffer
Christoffer

Reputation: 2411

How do I pass variables from Airflow to (GCP) Dataform?

I have an existing data pipeline in Airflow (through Google Cloud Composer) in which I execute Python and SQL-based tasks. Now, I am working on implementing it into Dataform (Google Cloud-version - not Dataform.co, if that matters).

When I use my SQL-based tasks I can easily "catch" and use the parameters with {{params.variable}}. For example if I pass the variables country and city from Airflow, I currently do this in the SQL-file called by the Airflow DAG:

SELECT id, name FROM my_{{params.country}}_dataset.abc123_{{params.city}}_table

in order to be able to run the same SQL-script for several countries and cities in Airflow.

What is the equivalence in Dataform / sqlx? I preferrably want to use this is in the source/config in which I define my data sources, in Dataform.

Upvotes: 1

Views: 2178

Answers (3)

J.C.
J.C.

Reputation: 1

I've tried the method provided by Lucas. It worked, but I had to change compilation_result to

compilation_result = {
    "code_compilation_config": {
        "default_schema": DEFAULT_SCHEMA,
        "vars": {"country": some_country, "city": some_city},
    },
}

BTW, my version of google provider is apache-airflow-providers-google==8.9.0

ref https://cloud.google.com/dataform/reference/rest/v1beta1/projects.locations.repositories.compilationResults#codecompilationconfig

Upvotes: 0

Lucas
Lucas

Reputation: 36

To execute your dataform code, you first need to compile it. Fortunately you can do it through Airflow using the DataformCreateCompilationResultOperator. This Airflow Operator take a parameter called compilation_result. This parameter is where you can pass your variables (those from the dataform.json).

Below an example :


compilation_result = {
    "code_compilation_config": {"default_schema": DEFAULT_SCHEMA},
    "vars": {
        "country": some_country,
        "city": some_city
    }
}

DataformCreateCompilationResultOperator(
    task_id="create_compilation_result",
    project_id=PROJECT_ID,
    region=DATAFORM_REGION,
    repository_id=DATAFORM_REPOSITORY_ID,
    compilation_result=compilation_result,
    start_date=days_ago(1),
    timeout=None

)

Then you can use the DataformCreateWorkflowInvocationOperator to execute you workflow :

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=DATAFORM_REGION,
    repository_id=DATAFORM_REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
    },
    start_date=days_ago(1),
    timeout=None
)

Upvotes: 1

Hussein Awala
Hussein Awala

Reputation: 5110

You can define your query as:

SELECT id, name FROM my_${dataform.projectConfig.vars.country}_dataset.abc123_{dataform.projectConfig.vars.city}_table

Then you can provide the variables default values in the dataform.json file:

{
  ...
  "vars": {
    "country": "some_country",
    "city": "some_city",
  },
  ...
}

and override them using dataform CLI:

dataform run --vars=country=another_country,city=another_city

Upvotes: 1

Related Questions