Reputation: 2102
could someone please give me step by step manual on how to connect Apache Airflow to Slack workspace. I created webhook to my channel and what should I do with it next ?
Kind regards
Upvotes: 16
Views: 25774
Reputation: 1271
Try the new SlackWebhookOperator which is there in Airflow version>=1.10.0
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
slack_msg = "Hi Wssup?"
slack_test = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack_connection',
webhook_token='/1234/abcd',
message=slack_msg,
channel='#airflow_updates',
username='airflow_'+os.environ['ENVIRONMENT'],
icon_emoji=None,
link_names=False,
dag=dag)
Note: Make sure you have slack_connection
added in your Airflow connections as
host=https://hooks.slack.com/services/
Upvotes: 10
Reputation: 18884
SlackAPIPostOperator
Operator in your DAG as belowSlackAPIPostOperator(
task_id='failure',
token='YOUR_TOKEN',
text=text_message,
channel=SLACK_CHANNEL,
username=SLACK_USER)
The above is the simplest way you can use Airflow to send messages to Slack.
However, if you want to configure Airflow to send messages to Slack on task failures, create a function and add on_failure_callback
to your tasks with the name of the created slack function. An example is below:
def slack_failed_task(contextDictionary, **kwargs):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: DAG Failed',
owner = '_owner',)
return failed_alert.execute()
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
Using SlackWebHook (Works only for Airflow >= 1.10.0):
If you want to use SlackWebHook
use SlackWebhookOperator
in a similar manner:
Upvotes: 20
Reputation: 2974
The full example with SlackWebhookOperator
usage as in @kaxil answer:
def slack_failed_task(task_name):
failed_alert = SlackWebhookOperator(
task_id='slack_failed_alert',
http_conn_id='slack_connection',
webhook_token=Variable.get("slackWebhookToken", default_var=""),
message='@here DAG Failed {}'.format(task_name),
channel='#epm-marketing-dev',
username='Airflow_{}'.format(ENVIRONMENT_SUFFIX),
icon_emoji=':red_circle:',
link_names=True,
)
return failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
As @Deep Nirmal Note: Make sure you have slack_connection added in your Airflow connections as
host=https://hooks.slack.com/services/
Upvotes: 5