kfondingle
kfondingle

Reputation: 21

How do I pass the run date (ds) as a variable from Airflow to Dataform (GCP)

I have a Dataform pipeline that uses various BigQuery source tables based on the current run date, I use Airflow to trigger the pipeline and I'm currently using the "CURRENT_DATE" to query the source tables which is fine until I need to re-run a previous task from another day for any reason.

I would like to be able to use the run date from the Airflow DAG and pass it to Dataform so that I can re-run a task from any previous date, also in the future I may need to introduce other variables.

I've only found one other question on here similar to this one but it hasn't been resolved: here

I know I can add the vars object to the dataform.json file and add the date there and I think it's this that I have to use/overwrite somehow

{
  "defaultSchema": "dataform",
  "assertionSchema": "dataform_assertions",
  "warehouse": "bigquery",
  "defaultDatabase": "prod-dna-pipelines",
  "defaultLocation": "EU",
  "vars" : {
    "date" : "2023-05-01"
  }
}

I've tried variations of the below in DataformCreateCompilationResultOperator, adding the vars object to the compilation result operator

create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "workspace": (
                f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"

                f"workspaces/{WORKSPACE_ID}"
            ),
            "vars": {
                "date": "{{ ds }}"
            }
        },
    )

But this just results in the Airflow task failing like so:

ERROR - Failed to execute job 44542 for task create_compilation_result (Protocol message CompilationResult has no "vars" field.; 154829)

I'm pretty stuck at this point, is there a way to achieve this? Is it even possible?

Upvotes: 2

Views: 1236

Answers (2)

NovasVilla
NovasVilla

Reputation: 56

The best way I found to fix this issue is to create a TemplatedDataformCreateCompilationResultOperator that inherits from official DataformCreateCompilationResultOperator. All the problem is becouse the DataformCreateCompilationResultOperator has not any template_fields, and you are trying to use compilation_result as one.

My example:

from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
)


class TemplatedDataformCreateCompilationResultOperator(
    DataformCreateCompilationResultOperator
):
    template_fields = DataformCreateCompilationResultOperator.template_fields + (
        "compilation_result",
    )

    def execute(self, context):
        self.log.info(
            f"Executing CustomDataformCreateCompilationResultOperator with compilation_result: {self.compilation_result}"
        )

        result = super().execute(context)

        self.log.info(f"Compilation result as dictionary: {result}")
        return result

info about the official Operator: DataformCreateCompilationResultOperator

Best,

Upvotes: 1

axel carcamo
axel carcamo

Reputation: 11

The vars parameter it must be inside code_compilation_config object

create_compilation_result = DataformCreateCompilationResultOperator(
    task_id="create_compilation_result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result={
    "git_commitish": GIT_COMMITISH,
    "code_compilation_config": {
        "vars": {
            "PROCESS_DATE": "2023-05-08"
        }
    }
},
)

If you want to use the {{ ds }} inside DataformCreateCompilationResultOperator you should put this operator inside a decorator task. Here is an example

@task
def transform(ds=None,execution_date=None, ti=None):
  create_compilation_result = DataformCreateCompilationResultOperator(
      task_id="create_compilation_result",
      project_id=PROJECT_ID,
      region=REGION,
      repository_id=REPOSITORY_ID,
      compilation_result={
      "git_commitish": GIT_COMMITISH,
      "code_compilation_config": {
          "vars": {
              "PROCESS_DATE": ds
          }
      }
  },
  )
  return create_compilation_result.execute(get_current_context())

with models.DAG(
    DAG_ID,
    default_args=default_dag_args,
    schedule_interval='0 3 * * *',
    start_date=datetime(2022, 1, 1),
    catchup=False
) as dag:

    compilation_result_var = transform()

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('transform')['name'] }}"
        },
    )

compilation_result_var >> create_workflow_invocation

Upvotes: 1

Related Questions