Reputation: 111
Is it possible to setup Nagios alerts for airflow dags? In case the dag is failed, I need to alert the respective groups.
Upvotes: 1
Views: 643
Reputation: 436
You can add an "on_failure_callback" to any task which will call an arbitrary failure handling function. In that function you can then send an error call to Nagios.
For example:
dag = DAG(dag_id="failure_handling",
schedule_interval='@daily')
def handle_failure(context):
# first get useful fields to send to nagios/elsewhere
dag_id = context['dag'].dag_id
ds = context['ds']
task_id = context['ti'].task_id
# instead of printing these out - you can send these to somewhere else
logging.info("dag_id={}, ds={}, task_id={}".format(dag_id, ds, task_id))
def task_that_fails(**kwargs):
raise Exception("failing test")
task_to_fail = PythonOperator(
task_id='python_task_to_fail',
python_callable=task_that_fails,
provide_context=True,
on_failure_callback=handle_failure,
dag=dag)
If you run a test on this: airflow test failure_handling task_to_fail 2018-08-10
You get the following in your log output: INFO - dag_id=failure_handling, ds=2018-08-10, task_id=task_to_fail
Upvotes: 1