Reputation: 117
I am running Airflow (Version 2.5.3) DAGs on GCP Cloud Composer, which has several tasks that will trigger a java-based dataflow job. The code of the task looks like this:
trigger_dataflow = BeamRunJavaPipelineOperator(
task_id=f"template_task_id",
dag=dag,
task_concurrency=1,
depends_on_past=True,
runner="DataflowRunner",
jar=jar_path,
job_class=jar_class,
pipeline_options={
'task': 'template_task',
'sql': "template_sql",
'bigtableInstanceId': bt_instance_id,
**({"network": dataflow_network} if dataflow_network is not None else {}),
**({"subnetwork": dataflow_subnetwork} if dataflow_subnetwork is not None else {}),
},
dataflow_config=DataflowConfiguration(
job_name=f'template_job_nmae',
poll_sleep=60,
project_id=dataflow_project_id,
service_account=dataflow_service_account,
),
params={
"tableId": f"template_table"
}
)
The tasks keeps failing with return code Negsignal.SIGKILL. After some investigation I realize this code indicates that the workers are lack of resource, which causing the task to be force killed.
I further took a look at Airflow task logs and I found that the task keeps checking the status of the dataflow jobs(every 1 second), I doubt that too many requests made to dataflow side caused the issue.
I tried to increase the sleep time between every dataflow status polling by using the field poll_sleep to 60 in dataflow_config, but when I rerun the Airflow tasks, it still checks the dataflow every 1 second.
what's wrong with my approach? Or is there any suggested way to do so?
Upvotes: 0
Views: 1121
Reputation: 2525
If tasks are getting Negsignal.SIGKILL
, and it's due to insufficient resources, I recommend either increasing the size of the worker or decreasing worker_concurrency (assuming you are using the CeleryExecutor).
Upvotes: 1