Reputation: 45
I've got an Airflow function who send failed notification task in Teams group. With new Microsoft politics my old webhook call will not function anymore in few months so i want to work right now to adapt the code. There is my current code :
from airflow.models import Variable
import logging
import requests
def failed_task_notify_teams(context):
logging.info("Send notification to the Teams Group")
payload = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"title": "Airflow Task Error",
"summary": f"Task {context['task_instance_key_str']} : Failed",
"themeColor": "F44336",
"sections": [
{
"activityTitle": f"Task {context['task_instance_key_str']} : Failed",
"activitySubtitle": f"DAG: {context['dag'].dag_id}",
"facts": [
{
"name": "Date",
"value": context['ds']
},
{
"name": "Log URL",
"value": context['task_instance'].log_url
}
]
}
],
"potentialAction": [{
"@type": "OpenUri",
"name": "See Logs",
"targets": [{
"os": "default",
"uri": context['task_instance'].log_url
}]
}]
}
headers = {"content-type": "application/json"}
requests.post(Variable.get('teams_webhook_secret'), json=payload, headers=headers)
logging.info("Teams notification sent to the group!")
In my mind the code should looking pretty the same but this one not working right now with Microsoft 365 Workflow.
Upvotes: 0
Views: 546
Reputation: 45
After looking bit futher i was able to find a solution. The code looking effectively pretty same, I only need to change the payload "parameters". There is the code working with modifications :
from airflow.models import Variable
import logging
import requests
def failed_task_notify_teams(context):
logging.info("Send notification to the Teams Group")
payload = {
"type": "message",
"attachments" : [{
"contentType": "application/vdn.microsoft.card.adaptive",
"contentUrl": None,
"content": {
"$schema": "http://adaptivecard.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.2",
"body": [
{
"type": "TextBlock",
"text": f"Task {context['task_instance_key_str']} : Failed",
"weight": "bolder",
"size": "medium",
"color": "attention",
},
{
"type": "TextBlock",
"text": f"DAG : {context['dag'].dag_id}",
"spacing": "none"
},
{
"type": "FactSet",
"facts": [
{
"title": "Date :",
"value": context['ds']
},
{
"title": "Log URL :",
"value": f"[Click here]({context['task_instance'].log_url})"
}
]
}
],
"actions": [
{
"type": "Action.OpenUrl",
"title": "See logs",
"url": context['task_instance'].log_url
}
]
}
}]
}
headers = {"content-type": "application/json"}
response = requests.post(Variable.get('teams_azure_webook_secret'), json=payload, headers=headers)
if response.status_code == 200:
logging.info("Notification sent to Teams Group")
else:
logging.error(f"Failed to sent notification to Teams Group: {response.status_code} - {response.text}")
Upvotes: 1