Reputation: 976
I have a Composer/Airflow instance in GCP (version composer-1.12.2-airflow-1.10.10), which runs its current jobs acceptably (approx. 40 DAGs with 6 Dataflow jobs each and 18 other shorter tasks in each DAG). The Dataflow jobs are run from a Dataflow template (normal template, not flex template) and typically take around 20min to complete, partly in parallel. I start them with a python DataflowTemplatedJobStartOperator
If I use the Dataflow option --flexRSGoal=COST_OPTIMIZED, the instance clogs up. At best it manages to schedule one Dataflow job per minute to start with over the first 20min, at high CPU usage. This results in tasks accumulating, slowing it down even more until it almost stops scheduling.
The flexRSGoal setting is the only difference between the working and problematic setup.
I expect the DataflowTemplatedJobStartOperator doesn't properly support the situation where Dataflow jobs are in state "Queued" for a while before they start - or that I need to set/adjust some other parameters to make this work. Does anybody have an idea? Thank you.
Upvotes: 0
Views: 898
Reputation: 2850
Airflow version 1.10.0 doesn't recognize QUEUED
as a possible state for Dataflow. You can see this by following the execution path of the template operator:
DataflowHook
is called to start the templatestart_template_dataflow()
method calls a "private" _start_template_dataflow()
method_start_template_dataflow()
there's a call to wait_for_done()
method in the _DataflowJob
classwait_for_done()
method we can see that there's an if/else
block handling the expected job status where QUEUED is not consideredThis is the same for other Airflow versions supported by Composer, i.e., 1.10.6, 1.10.9 and even the most recent 1.10.12.
As a workaround my suggestion is to use a monkey patch to handle the QUEUED state. For example, you can add the following code to your DAG file to replace at runtime the wait_for_done()
method in _DataflowJob
:
from airflow.contrib.hooks.gcp_dataflow_hook import _DataflowJob
import time
def wait_for_done(self):
while True:
if self._job and 'currentState' in self._job:
if 'JOB_STATE_DONE' == self._job['currentState']:
return True
elif 'JOB_STATE_RUNNING' == self._job['currentState'] and \
'JOB_TYPE_STREAMING' == self._job['type']:
return True
elif 'JOB_STATE_FAILED' == self._job['currentState']:
raise Exception("Google Cloud Dataflow job {} has failed.".format(
self._job['name']))
elif 'JOB_STATE_CANCELLED' == self._job['currentState']:
raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
self._job['name']))
elif 'JOB_STATE_RUNNING' == self._job['currentState']:
time.sleep(self._poll_sleep)
elif 'JOB_STATE_PENDING' == self._job['currentState']:
time.sleep(15)
elif 'JOB_STATE_QUEUED' == self._job['currentState']:
# Uncomment here the behavior desired
# time.sleep(15) # As if QUEUED was a PENDING state
# time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
# return True # As if QUEUED was a final state
else:
self.log.debug(str(self._job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
self._job['name'], self._job['currentState']))
else:
time.sleep(15)
self._job = self._get_job()
_DataflowJob.wait_for_done = wait_for_done
The difference with the original code is the elif
statement where we are looking for the QUEUED state:
elif 'JOB_STATE_QUEUED' == self._job['currentState']:
# Uncomment here the behavior desired
# time.sleep(15) # As if QUEUED was a PENDING state
# time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
# return True # As if QUEUED was a final state
Notice that I left commented the three most natural way to handle this state, i.e., sleep (15 seconds or the poll_sleep time) to wait for the job to execute and complete, or simply return true and don't wait for the execution. You can uncomment the line that you want to execute, or you can even add your own logic here.
Upvotes: 1