Reputation: 21
I have a Dataform pipeline that uses various BigQuery source tables based on the current run date, I use Airflow to trigger the pipeline and I'm currently using the "CURRENT_DATE" to query the source tables which is fine until I need to re-run a previous task from another day for any reason.
I would like to be able to use the run date from the Airflow DAG and pass it to Dataform so that I can re-run a task from any previous date, also in the future I may need to introduce other variables.
I've only found one other question on here similar to this one but it hasn't been resolved: here
I know I can add the vars object to the dataform.json file and add the date there and I think it's this that I have to use/overwrite somehow
{
"defaultSchema": "dataform",
"assertionSchema": "dataform_assertions",
"warehouse": "bigquery",
"defaultDatabase": "prod-dna-pipelines",
"defaultLocation": "EU",
"vars" : {
"date" : "2023-05-01"
}
}
I've tried variations of the below in DataformCreateCompilationResultOperator, adding the vars object to the compilation result operator
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"workspace": (
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
f"workspaces/{WORKSPACE_ID}"
),
"vars": {
"date": "{{ ds }}"
}
},
)
But this just results in the Airflow task failing like so:
ERROR - Failed to execute job 44542 for task create_compilation_result (Protocol message CompilationResult has no "vars" field.; 154829)
I'm pretty stuck at this point, is there a way to achieve this? Is it even possible?
Upvotes: 2
Views: 1236
Reputation: 56
The best way I found to fix this issue is to create a TemplatedDataformCreateCompilationResultOperator that inherits from official DataformCreateCompilationResultOperator. All the problem is becouse the DataformCreateCompilationResultOperator has not any template_fields, and you are trying to use compilation_result as one.
My example:
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
)
class TemplatedDataformCreateCompilationResultOperator(
DataformCreateCompilationResultOperator
):
template_fields = DataformCreateCompilationResultOperator.template_fields + (
"compilation_result",
)
def execute(self, context):
self.log.info(
f"Executing CustomDataformCreateCompilationResultOperator with compilation_result: {self.compilation_result}"
)
result = super().execute(context)
self.log.info(f"Compilation result as dictionary: {result}")
return result
info about the official Operator: DataformCreateCompilationResultOperator
Best,
Upvotes: 1
Reputation: 11
The vars parameter it must be inside code_compilation_config object
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": {
"vars": {
"PROCESS_DATE": "2023-05-08"
}
}
},
)
If you want to use the {{ ds }} inside DataformCreateCompilationResultOperator you should put this operator inside a decorator task. Here is an example
@task
def transform(ds=None,execution_date=None, ti=None):
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": {
"vars": {
"PROCESS_DATE": ds
}
}
},
)
return create_compilation_result.execute(get_current_context())
with models.DAG(
DAG_ID,
default_args=default_dag_args,
schedule_interval='0 3 * * *',
start_date=datetime(2022, 1, 1),
catchup=False
) as dag:
compilation_result_var = transform()
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('transform')['name'] }}"
},
)
compilation_result_var >> create_workflow_invocation
Upvotes: 1