sacoder
sacoder

Reputation: 189

DataflowTemplateOperator job failing on Cloud Composer after upgrading to Airflow 2

We run a number of DataflowTemplateOperator jobs (JDBC to BigQuery template) in our current composer environment 1.16.0 with airflow 1.10.15. However, upon trying to run the same DAG under composer 1.17.6 airflow 2.1.4 , we are getting the below error:

[2021-12-07 03:08:56,478] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataflow.py", line 682, in execute
    job = self.hook.start_template_dataflow(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 99, in inner_wrapper
    raise AirflowException(
airflow.exceptions.AirflowException: The mutually exclusive parameter `location` and `region` key in `variables` parameter are both present. Please remove one.

We do set the region parameter in the dataflow_default_options and they get rendered under airflow 1 as follows:

{'gcpTempLocation': 'gs://us-east1-xxxx/tmp/',
 'machineType': 'n2-standard-2',
 'project': 'xxxx',
 'region': 'us-east1',
 'runner': 'DataflowRunner'}

But it looks like the region parameter can no longer be set via the dataflow_default_options under airflow 2. Trying to set "location" instead of "region" has no effect and the job defaults to us-central1.

Both environments are using the same template and this was verified in Dataflow jobs.

The reason we set the region is because we launch a number of dataflow tasks and if we do not set them, then the cpu quotas get hit. We had our us-east1 cpu quota increased.

Any pointers is appreciated.

Thanks.

Upvotes: 1

Views: 2266

Answers (2)

VVildVVolf
VVildVVolf

Reputation: 136

You can setup location property on operator level instead of using region:

    init_kwargs = {
        ...
        "location": "us-east1",
        "dataflow_default_options": {
            # "region": "us-east1",
        },
        ...
    }
    operator = DataflowTemplatedJobStartOperator(**init_kwargs)

or explicitly hardcode location: None,:

    init_kwargs = {
        ...
        "location": None,
        "dataflow_default_options": {
            "region": "us-east1",
        },
        ...
    }
    operator = DataflowTemplatedJobStartOperator(**init_kwargs)

P.S. The fix is already merged https://github.com/apache/airflow/commit/810b5d4da4396cedcd483d20e50873c2b81cf5ad, might be included to 2.6.1 release or later.

Upvotes: 1

Betjens
Betjens

Reputation: 1401

Its good to know that you were able to solve your issue. I'm leaving this answer for community visibility of the current versions of the DataflowTemplateOperator. Feel free to update the answer if you see fit.

Also, here you can find the official DataflowTemplateOperator usage sample for both versions too.

Upvotes: 3

Related Questions