Reputation: 505
Recently I upgraded the Apache Beam to 2.20.0 and then my Airflow tasks of Dataflow job started to fail although the dataflow job itself succeeded.
I noticed that after the upgrade the GET API for dataflow is using location instead of job id in the URL
GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/us-central1?alt=json
Ideally the URL should be like this
GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/{job_id}?alt=json
Can somebody explain why this is happening?
Upvotes: 0
Views: 1056
Reputation: 2094
This is a known Airflow issue. The fix has been merged to the Airflow master but has not been released yet.
As a workaround you can either use Apache Beam SDK 2.19.0 or implement the fix in a custom hook (identical to dataflow_hook.py but with the suggested change applied) and then implement a custom operator that uses this hook. Here is how I did it:
First, I created a file named my_dataflow_hook.py
:
import re
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _Dataflow, _DataflowJob
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class _myDataflow(_Dataflow):
@staticmethod
def _extract_job(line):
job_id_pattern = re.compile(
br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*")
matched_job = job_id_pattern.search(line or '')
if matched_job:
return matched_job.group(1).decode()
class MyDataFlowHook(DataFlowHook):
@GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
def _start_dataflow(self, variables, name, command_prefix, label_formatter):
variables = self._set_variables(variables)
cmd = command_prefix + self._build_cmd(variables, label_formatter)
job_id = _myDataflow(cmd).wait_for_done()
_DataflowJob(self.get_conn(), variables['project'], name,
variables['region'],
self.poll_sleep, job_id,
self.num_retries).wait_for_done()
Then, I created a file named my_dataflow_python.py
:
import re
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator, GoogleCloudBucketHelper
from hooks.my_dataflow_hook import MyDataFlowHook
from airflow.plugins_manager import AirflowPlugin
class MyDataFlowPythonOperator(DataFlowPythonOperator):
def execute(self, context):
"""Execute the python dataflow job."""
bucket_helper = GoogleCloudBucketHelper(
self.gcp_conn_id, self.delegate_to)
self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
hook = MyDataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
dataflow_options = self.dataflow_default_options.copy()
dataflow_options.update(self.options)
# Convert argument names from lowerCamelCase to snake case.
camel_to_snake = lambda name: re.sub(
r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
formatted_options = {camel_to_snake(key): dataflow_options[key]
for key in dataflow_options}
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options)
class MyDataFlowPlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'dataflow_fix_plugin'
operators = [MyDataFlowPythonOperator]
Finally, I uploaded these files into the bucket of the Composer environment following this structure:
├── dags
│ └── my_dag.py
└── plugins
├── hooks
│ └── my_dataflow_hook.py
└── my_dataflow_python.py
Now, I can create tasks with MyDataFlowPythonOperator
in my DAGs:
from airflow import DAG
from airflow.operators.dataflow_fix_plugin import MyDataFlowPythonOperator
...
with DAG("df-python-test", default_args=default_args) as dag:
test_task = MyDataFlowPythonOperator(dag=dag, task_id="df-python", py_file=PY_FILE, job_name=JOB_NAME)
Upvotes: 1