Reputation: 45
I'm running Apache Airflow 2.2.5 on Google Cloud Composer 2.0.31. The region for my composer instance is europe-west1. I'm trying to use Composer to trigger a dataflow job on a different project. I'm using the below DAG to run DataflowTemplatedJobStartOperator. The issue I'm running into is that when the DAG executes the job region is us-central1 with the worker_location in europe-west1. I've tried lots of different combinations of parameters and can't seem to get the job_region to be europe-west1 as well. Any ideas on what I might be doing wrong?
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago
default_args = {
"start_date": days_ago(1),
"retries":0,
"dataflow_default_options": {
"project": "my-project",
"location": "europe-west1",
"zone": "europe-west1-b",
"stagingLocation": "gs://my-bucket-temp/temp/",
"tempLocation": "gs://my-bucket-temp/temp/",
"workerMachineType": "n1-standard-1",
},
}
with models.DAG(
"dataflow-batch-redis-revision-1",
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
start_template_job = DataflowTemplatedJobStartOperator(
task_id="dataflow_operator_batch_bq_to_redis",
template="gs://my-bucket-temp/template/BatchRedisUpdatePipelineTemplate",
parameters={
"inputTopic": "inputtopic",
"bigQueryInputProject": "inputproject",
"bigQueryInputDataset": "dataset",
"bigQueryInputTable": "table",
"bigQueryInputSQLBranchMetadata": "DUMMY",
"bigQueryInputSQLBranchSkuWeek": "DUMMY",
"redisHost": "host",
"redisPort": "6379",
},
)
Upvotes: 2
Views: 1694
Reputation: 6572
For this operator you have to pass the location, because the default value is us-central1
:
DataflowTemplatedJobStartOperator(
task_id="dataflow_operator_batch_bq_to_redis",
template="gs://my-bucket-temp/template/BatchRedisUpdatePipelineTemplate",
location="europe-west1",
parameters={
"inputTopic": "inputtopic",
"bigQueryInputProject": "inputproject",
"bigQueryInputDataset": "dataset",
"bigQueryInputTable": "table",
"bigQueryInputSQLBranchMetadata": "DUMMY",
"bigQueryInputSQLBranchSkuWeek": "DUMMY",
"redisHost": "host",
"redisPort": "6379",
}
)
In the constructor of this operation, we can see that the location
field has a default value with us-central1
:
DEFAULT_DATAFLOW_LOCATION = "us-central1"
def __init__(
self,
*,
template: str,
job_name: str = "{{task.task_id}}",
options: Optional[Dict[str, Any]] = None,
dataflow_default_options: Optional[Dict[str, Any]] = None,
parameters: Optional[Dict[str, str]] = None,
project_id: Optional[str] = None,
location: str = DEFAULT_DATAFLOW_LOCATION,
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
poll_sleep: int = 10,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
environment: Optional[Dict] = None,
cancel_timeout: Optional[int] = 10 * 60,
wait_until_finished: Optional[bool] = None,
**kwargs,
)
If you pass the argument region": "europe-west1"
in the Dataflow
options, this Airflow
operator will overwrite this value by the location
field value.
That's why you have to pass the location
field in the operator.
Upvotes: 4