Reputation: 219
I need help with this issue.
We recently migrated from Cloud Composer 1 to Cloud Composer 2 but we have issues when Composer tries to check the DataFlow job status. Dataflow correctly starts but often, the Composer worker seems unable to check job status. It happens only on some flows, but the DAGs of these flows have the same structure and parameters as those that end correctly.
here is a dag that always fails
import pendulum
from airflow.providers.google.cloud.operators.dataflow import (
DataflowTemplatedJobStartOperator,
)
from airflow import models
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': pendulum.yesterday("Europe/Rome"),
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
'depends_on_past': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=20),
'project_id': 'my-project-id',
'location': 'europe-west1',
'dataflow_default_options': {
'zone': 'europe-west1-b',
'tempLocation': 'gs://...omissis.../temp/'
}
}
with models.DAG(
'run_agent_performance_report',
schedule_interval=datetime.timedelta(hours=12),
default_args=default_dag_args,
max_active_runs=1) as dag:
run_agent_performance_report = DataflowTemplatedJobStartOperator(
task_id='run_agent_performance_report',
template='gs://...omissis...-template',
job_name='agent-performance-report')
run_agent_performance_report
I think that the issues can be related to the configuration of the location/zone, composer 2 runs into an Autopilot GKE cluster which spawns in 3 zones but dataflow runs only into europe-west-1-b
Dataflow job ends correctly (in dataflow jobs report is marked as successful ) but airflow report it as unresponsive, sometimes, it seems that airflow does not even try to check for the job state. The DAG just "hang"...
How can we fix this? Thanks
Upvotes: 0
Views: 20