Werner
Werner

Reputation: 95

Airflow: Monitoring solutions for a DAG run

I'm currently trying to setup a monitoring for Airflow, that would ideally send out an email, when a DAG was executed, containing in the mail some information about all the contained tasks, like final status of the task, runtime, etc.

What I currently fail to resolve is/are:

Next to these I have the additional pickle, that the solution that I look for has to be simple in a sense, that it should either be just 2-3 lines of code or generilizable into a Python function as my less Python-experienced colleagues have to be able to understand and reproduce the steps on other DAGs.

Smarter ideas about how to establish the email sending are very welcome.

Thank you for all the suggestions in advance!

Upvotes: 1

Views: 2432

Answers (1)

Alan Ma
Alan Ma

Reputation: 591

Does it make sense to have the mail-sending as a component within the DAG? If so, how could I then assure in a simple way, that the task will run after all other tasks?

I think that is one way to go about achieving what you want. You can create a task that connect all the "leaves" (tasks with no downstream dependencies) to a final task that emails the state of the DAG (dagrun is still running in this scenario) that has the state of the other tasks.

    def send_task_summary_t(**context):
        tis = context['dag_run'].get_task_instances()
        for ti in tis:
            print(ti.__dict__)

    dag = DAG(...)

    job_status = PythonOperator(
        task_id='_job_status',
        python_callable=send_task_summary,
        provide_context=True,
        trigger_rule=TriggerRule.ALL_DONE,
        dag=dag
    )

    leaves = [task for task in dag.tasks if not task.downstream_list]
    exclude = ['_job_status']
    for l in leaves:
        if l.task_id not in exclude:
            job_status.set_upstream(l)

How can I get the states of all task instances associated with a DAG run?

Instead of the EmailOperator, I would suggest the PythonOperator since you will need the context which contains information you need to grab the state of the tasks. Building off of the snippet above, I leveraged the send_email utility to send an email.

from airflow.utils.email import send_email

def send_task_summary_t(**context):
    ti = context['task']
    dr = context['dag_run']
    body = ti.render_template(None, "path/to/template", context)
    send_email(to="[email protected]", subject=f"{dr} summary", html_content=body)

You can also use Jinja templating to build your email.

<html>
    <body>
       <div>
          <table>
                {% for ti in dag_run.get_task_instances(): -%}
                    <tr>
                        <td class='{{ti.state}}' >
                            <a href='{{ host_server }}/admin/airflow/log?execution_date={{ts}}&task_id={{ti.task_id}}&dag_id={{dag.dag_id}}'>{{ti.state}}</a></td>
                        <td class="{{ti.operator}}">
                            <a href='{{ host_server }}/admin/airflow/graph?root={{ti.task_id}}&dag_id={{dag.dag_id}}&execution_date={{ts}}'>{{ti.task_id}}</a></td>
                        <td><a href='{{ host_server }}/admin/airflow/tree?base_date={{ts}}&num_runs=50&root={{ti.task_id}}&dag_id={{dag.dag_id}}'>{{ti.start_date}}</a></td>
                        <td><a href='{{ host_server }}/admin/airflow/gantt?root={{ti.task_id}}&dag_id={{dag.dag_id}}&execution_date={{ts}}'>{{ti.end_date}}</a></td>
                        <td><a href='{{ host_server }}/admin/airflow/duration?root={{ti.task_id}}&base_date={{ts}}&days=9999&dag_id={{dag.dag_id}}'>{{ti.duration}}</a></td>
                    </tr>
                {% endfor -%}
            </table>
        </div>
    </body>
</html>

Another way you can go about this is to utilize the on_failure_callback for the DAG object.

from airflow.models import DAG
from datetime import datetime

def send_task_summary(context):
    tis = context['dag_run'].get_task_instances()
    for ti in tis:
        print(ti.__dict__)

dag = DAG(
        dag_id='my_dag',
        schedule_interval='@once',
        start_date=datetime(2020, 1, 1),
        on_failure_callback=send_task_summary
) 

Upvotes: 2

Related Questions