JosephZoom94
JosephZoom94

Reputation: 117

Airflow tasks failed with return code Negsignal.SIGKILL

I am running Airflow (Version 2.5.3) DAGs on GCP Cloud Composer, which has several tasks that will trigger a java-based dataflow job. The code of the task looks like this:

trigger_dataflow = BeamRunJavaPipelineOperator(
            task_id=f"template_task_id",
            dag=dag,
            task_concurrency=1,
            depends_on_past=True,
            runner="DataflowRunner",
            jar=jar_path,
            job_class=jar_class,
            pipeline_options={
                'task': 'template_task',
                'sql': "template_sql",
                'bigtableInstanceId': bt_instance_id,
                **({"network": dataflow_network} if dataflow_network is not None else {}),
                **({"subnetwork": dataflow_subnetwork} if dataflow_subnetwork is not None else {}),
            },
            dataflow_config=DataflowConfiguration(
                job_name=f'template_job_nmae',
                poll_sleep=60,
                project_id=dataflow_project_id,
                service_account=dataflow_service_account,
            ),
            params={
                "tableId": f"template_table"
            }
        )

The tasks keeps failing with return code Negsignal.SIGKILL. After some investigation I realize this code indicates that the workers are lack of resource, which causing the task to be force killed.

I further took a look at Airflow task logs and I found that the task keeps checking the status of the dataflow jobs(every 1 second), I doubt that too many requests made to dataflow side caused the issue.

I tried to increase the sleep time between every dataflow status polling by using the field poll_sleep to 60 in dataflow_config, but when I rerun the Airflow tasks, it still checks the dataflow every 1 second.

what's wrong with my approach? Or is there any suggested way to do so?

Upvotes: 0

Views: 1121

Answers (1)

RNHTTR
RNHTTR

Reputation: 2525

If tasks are getting Negsignal.SIGKILL, and it's due to insufficient resources, I recommend either increasing the size of the worker or decreasing worker_concurrency (assuming you are using the CeleryExecutor).

Upvotes: 1

Related Questions