nsandersen
nsandersen

Reputation: 976

Composer/Airflow struggles running Dataflow jobs on preemptible VMs

I have a Composer/Airflow instance in GCP (version composer-1.12.2-airflow-1.10.10), which runs its current jobs acceptably (approx. 40 DAGs with 6 Dataflow jobs each and 18 other shorter tasks in each DAG). The Dataflow jobs are run from a Dataflow template (normal template, not flex template) and typically take around 20min to complete, partly in parallel. I start them with a python DataflowTemplatedJobStartOperator

If I use the Dataflow option --flexRSGoal=COST_OPTIMIZED, the instance clogs up. At best it manages to schedule one Dataflow job per minute to start with over the first 20min, at high CPU usage. This results in tasks accumulating, slowing it down even more until it almost stops scheduling.

The flexRSGoal setting is the only difference between the working and problematic setup.

I expect the DataflowTemplatedJobStartOperator doesn't properly support the situation where Dataflow jobs are in state "Queued" for a while before they start - or that I need to set/adjust some other parameters to make this work. Does anybody have an idea? Thank you.

Upvotes: 0

Views: 898

Answers (1)

Tlaquetzal
Tlaquetzal

Reputation: 2850

Airflow version 1.10.0 doesn't recognize QUEUED as a possible state for Dataflow. You can see this by following the execution path of the template operator:

  • Here the underline DataflowHook is called to start the template
  • The start_template_dataflow() method calls a "private" _start_template_dataflow() method
  • Inside _start_template_dataflow() there's a call to wait_for_done() method in the _DataflowJob class
  • Finally, in the wait_for_done() method we can see that there's an if/else block handling the expected job status where QUEUED is not considered

This is the same for other Airflow versions supported by Composer, i.e., 1.10.6, 1.10.9 and even the most recent 1.10.12.

As a workaround my suggestion is to use a monkey patch to handle the QUEUED state. For example, you can add the following code to your DAG file to replace at runtime the wait_for_done() method in _DataflowJob:

from airflow.contrib.hooks.gcp_dataflow_hook import _DataflowJob
import time

def wait_for_done(self):
    while True:
        if self._job and 'currentState' in self._job:
            if 'JOB_STATE_DONE' == self._job['currentState']:
                return True
            elif 'JOB_STATE_RUNNING' == self._job['currentState'] and \
                'JOB_TYPE_STREAMING' == self._job['type']:
                return True
            elif 'JOB_STATE_FAILED' == self._job['currentState']:
                raise Exception("Google Cloud Dataflow job {} has failed.".format(
                self._job['name']))
            elif 'JOB_STATE_CANCELLED' == self._job['currentState']:
                raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
                self._job['name']))
            elif 'JOB_STATE_RUNNING' == self._job['currentState']:
                time.sleep(self._poll_sleep)
            elif 'JOB_STATE_PENDING' == self._job['currentState']:
                time.sleep(15)
            elif 'JOB_STATE_QUEUED' == self._job['currentState']:
                # Uncomment here the behavior desired
                # time.sleep(15) # As if QUEUED was a PENDING state
                # time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
                # return True # As if QUEUED was a final state
            else:
                self.log.debug(str(self._job))
                raise Exception(
                    "Google Cloud Dataflow job {} was unknown state: {}".format(
                        self._job['name'], self._job['currentState']))
        else:
            time.sleep(15)

        self._job = self._get_job()

_DataflowJob.wait_for_done = wait_for_done

The difference with the original code is the elif statement where we are looking for the QUEUED state:

    elif 'JOB_STATE_QUEUED' == self._job['currentState']:
        # Uncomment here the behavior desired
        # time.sleep(15) # As if QUEUED was a PENDING state
        # time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
        # return True # As if QUEUED was a final state

Notice that I left commented the three most natural way to handle this state, i.e., sleep (15 seconds or the poll_sleep time) to wait for the job to execute and complete, or simply return true and don't wait for the execution. You can uncomment the line that you want to execute, or you can even add your own logic here.

Upvotes: 1

Related Questions