tank
tank

Reputation: 505

Not able to GET the state of Dataflow job in Airflow

Recently I upgraded the Apache Beam to 2.20.0 and then my Airflow tasks of Dataflow job started to fail although the dataflow job itself succeeded.

I noticed that after the upgrade the GET API for dataflow is using location instead of job id in the URL

GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/us-central1?alt=json

Ideally the URL should be like this

GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/{job_id}?alt=json

Can somebody explain why this is happening?

Upvotes: 0

Views: 1056

Answers (1)

itroulli
itroulli

Reputation: 2094

This is a known Airflow issue. The fix has been merged to the Airflow master but has not been released yet.

As a workaround you can either use Apache Beam SDK 2.19.0 or implement the fix in a custom hook (identical to dataflow_hook.py but with the suggested change applied) and then implement a custom operator that uses this hook. Here is how I did it:

First, I created a file named my_dataflow_hook.py:

import re

from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _Dataflow, _DataflowJob
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook


class _myDataflow(_Dataflow):
    @staticmethod
    def _extract_job(line):
        job_id_pattern = re.compile(
            br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*")
        matched_job = job_id_pattern.search(line or '')
        if matched_job:
            return matched_job.group(1).decode()


class MyDataFlowHook(DataFlowHook):
    @GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
    def _start_dataflow(self, variables, name, command_prefix, label_formatter):
        variables = self._set_variables(variables)
        cmd = command_prefix + self._build_cmd(variables, label_formatter)
        job_id = _myDataflow(cmd).wait_for_done()
        _DataflowJob(self.get_conn(), variables['project'], name,
                     variables['region'],
                     self.poll_sleep, job_id,
                     self.num_retries).wait_for_done()

Then, I created a file named my_dataflow_python.py:

import re

from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator, GoogleCloudBucketHelper
from hooks.my_dataflow_hook import MyDataFlowHook
from airflow.plugins_manager import AirflowPlugin


class MyDataFlowPythonOperator(DataFlowPythonOperator):
    def execute(self, context):
        """Execute the python dataflow job."""
        bucket_helper = GoogleCloudBucketHelper(
            self.gcp_conn_id, self.delegate_to)
        self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
        hook = MyDataFlowHook(gcp_conn_id=self.gcp_conn_id,
                            delegate_to=self.delegate_to,
                            poll_sleep=self.poll_sleep)
        dataflow_options = self.dataflow_default_options.copy()
        dataflow_options.update(self.options)
        # Convert argument names from lowerCamelCase to snake case.
        camel_to_snake = lambda name: re.sub(
            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
        formatted_options = {camel_to_snake(key): dataflow_options[key]
                             for key in dataflow_options}
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options)

class MyDataFlowPlugin(AirflowPlugin):
    """Expose Airflow operators."""

    name = 'dataflow_fix_plugin'
    operators = [MyDataFlowPythonOperator]

Finally, I uploaded these files into the bucket of the Composer environment following this structure:

├── dags
│   └── my_dag.py
└── plugins
    ├── hooks
    │   └── my_dataflow_hook.py
    └── my_dataflow_python.py

Now, I can create tasks with MyDataFlowPythonOperator in my DAGs:

from airflow import DAG
from airflow.operators.dataflow_fix_plugin import MyDataFlowPythonOperator
...
with DAG("df-python-test", default_args=default_args) as dag:
    test_task = MyDataFlowPythonOperator(dag=dag, task_id="df-python", py_file=PY_FILE, job_name=JOB_NAME)

Upvotes: 1

Related Questions