Maria Dorohin
Maria Dorohin

Reputation: 373

How can I capture exceptions thrown by a Java application within Airflow's on_failure_callback?

I am using Airflow to run KubernetesPodOperator tasks that run java application image.

The java application use beam dataflow.

My custom airflow operators inherit KubernetesPodOperator.

I am trying to catch java application exceptions in on_failure_callback airflow method.

But I am getting AirflowException instead of real exception.

I can not add try catch on my java code as I am using beam and pipeline.run() or pipeline.waitUntilFinish() don't directly throw exceptions that I can catch in a try-catch block. The result.waitUntilFinish() method in the Beam pipeline only captures exceptions that occur during the job's setup. It does not capture runtime exceptions that happen while the data is being processed.

How can I get java application exception on on_failure_callback?

def task_failure(context):
    exception = context.get('exception')
    if exception:
        exception_type = type(exception).__name__


default_args = {
    'on_failure_callback': task_failure
}

 with models.DAG(
                dag_name,
                default_args=default_args,
                start_date=start_date,
                schedule_interval=interval
        ) as dag:

versions:

Upvotes: -1

Views: 49

Answers (1)

Maria Dorohin
Maria Dorohin

Reputation: 373

Kubernetes does not Splitting log (Stdout/Stderr):

I am going to read Airflow log on external service via Airflow API and search for specific exceptions.

Upvotes: 0

Related Questions