Majobber
Majobber

Reputation: 45

DataflowTemplatedJobStartOperator in Apache Airflow not setting Job Region as expected

I'm running Apache Airflow 2.2.5 on Google Cloud Composer 2.0.31. The region for my composer instance is europe-west1. I'm trying to use Composer to trigger a dataflow job on a different project. I'm using the below DAG to run DataflowTemplatedJobStartOperator. The issue I'm running into is that when the DAG executes the job region is us-central1 with the worker_location in europe-west1. I've tried lots of different combinations of parameters and can't seem to get the job_region to be europe-west1 as well. Any ideas on what I might be doing wrong?

enter image description here

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago


default_args = {
    "start_date": days_ago(1),
    "retries":0,
    "dataflow_default_options": {
        "project": "my-project",
        "location": "europe-west1",
        "zone": "europe-west1-b",
        "stagingLocation": "gs://my-bucket-temp/temp/",
        "tempLocation": "gs://my-bucket-temp/temp/",
        "workerMachineType": "n1-standard-1",
    },
}

with models.DAG(
    "dataflow-batch-redis-revision-1",
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),  
) as dag:

    start_template_job = DataflowTemplatedJobStartOperator(
        task_id="dataflow_operator_batch_bq_to_redis",
        template="gs://my-bucket-temp/template/BatchRedisUpdatePipelineTemplate",
        parameters={
            "inputTopic": "inputtopic",
            "bigQueryInputProject": "inputproject",
            "bigQueryInputDataset": "dataset",
            "bigQueryInputTable": "table",
            "bigQueryInputSQLBranchMetadata": "DUMMY",
            "bigQueryInputSQLBranchSkuWeek": "DUMMY",
            "redisHost": "host",
            "redisPort": "6379",
        },
    )

Upvotes: 2

Views: 1694

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

For this operator you have to pass the location, because the default value is us-central1 :

DataflowTemplatedJobStartOperator(
        task_id="dataflow_operator_batch_bq_to_redis",
        template="gs://my-bucket-temp/template/BatchRedisUpdatePipelineTemplate",
        location="europe-west1",
        parameters={
            "inputTopic": "inputtopic",
            "bigQueryInputProject": "inputproject",
            "bigQueryInputDataset": "dataset",
            "bigQueryInputTable": "table",
            "bigQueryInputSQLBranchMetadata": "DUMMY",
            "bigQueryInputSQLBranchSkuWeek": "DUMMY",
            "redisHost": "host",
            "redisPort": "6379",
        }
    )

In the constructor of this operation, we can see that the location field has a default value with us-central1 :

DEFAULT_DATAFLOW_LOCATION = "us-central1"

def __init__(
        self,
        *,
        template: str,
        job_name: str = "{{task.task_id}}",
        options: Optional[Dict[str, Any]] = None,
        dataflow_default_options: Optional[Dict[str, Any]] = None,
        parameters: Optional[Dict[str, str]] = None,
        project_id: Optional[str] = None,
        location: str = DEFAULT_DATAFLOW_LOCATION,
        gcp_conn_id: str = "google_cloud_default",
        delegate_to: Optional[str] = None,
        poll_sleep: int = 10,
        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
        environment: Optional[Dict] = None,
        cancel_timeout: Optional[int] = 10 * 60,
        wait_until_finished: Optional[bool] = None,
        **kwargs,
    )

If you pass the argument region": "europe-west1" in the Dataflow options, this Airflow operator will overwrite this value by the location field value.

That's why you have to pass the location field in the operator.

Upvotes: 4

Related Questions