Aviv Oron
Aviv Oron

Reputation: 565

Airflow UI link in SlackAPIPostOperator?

Im using SlackAPIPostOperator in Airflow to send Slack messages on task failures. I wondered if there's a smart way to add a link to the airflow UI logs page of the failed task to the slack message.

The following is an example I want to achieve:

http://myserver-uw1.myaws.com:8080/admin/airflow/graph?execution_date=...&arrange=LR&root=&dag_id=MyDAG&_csrf_token=mytoken

The current message is:

def slack_failed_task(context):
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']))
    return failed_alert.execute(context=context)

Upvotes: 7

Views: 3170

Answers (2)

Tanjin
Tanjin

Reputation: 2452

We can also do this using the log_url attribute in the Task Instance

def slack_failed_task(context):
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']) + 
             '\nLogs: <{url}|to Airflow UI>'.format(url=context['task_instance'].log_url)
    )
    return failed_alert.execute(context=context)

I know this is available since version 1.10.4 at the very least.

Upvotes: 3

Daniel Huang
Daniel Huang

Reputation: 6548

You can build the url to the UI with the config value base_url under the [webserver] section and then use Slack's message format <http://example.com|stuff> for links.

from airflow import configuration

def slack_failed_task(context):
    link = '<{base_url}/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}|logs>'.format(
        base_url=configuration.get('webserver', 'BASE_URL'),
        dag_id=context['dag'].dag_id,
        task_id=context['task_instance'].task_id,
        execution_date=context['ts']))  # equal to context['execution_date'].isoformat())

    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']) +
             '\nSee ' + link + ' to debug')
    return failed_alert.execute(context=context)

Upvotes: 7

Related Questions