Reputation: 137
I upgraded my airflow to 2.0. After upgrading, my kubernetes_pod_operator not working and give me the following error. How do I fix this upgrade issue. What do I need to change in the code to make it work in airflow 2.0?
Error:
Broken DAG: [/home/airflow/gcs/dags/daily_data_dag.py] Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 178, in apply_defaults
result = func(self, *args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 506, in __init__
raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to KubernetesPodOperator (task_id: snowpack_daily_data_pipeline_16_11_2021). Invalid arguments were:
**kwargs: {'email_on_success': True}
Code:
import datetime
from airflow import models
from airflow.contrib.operators import kubernetes_pod_operator
from kubernetes.client import models as k8s
# DEFINE VARS HERE:
dag_name = "daily_data_pipeline"
schedule_interval = '@daily'
email = ["[email protected]"]
# get this from Gitlab
docker_image = "registry.gitlab.com/xxxx:dev"
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2021, 6, 26),
'depends_on_past': False,
'max_active_runs': 1,
# 'concurrency': 1
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
dag_name,
schedule_interval=schedule_interval,
default_args=default_dag_args,
catchup=False) as dag:
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id=f'{dag_name}_{datetime.datetime.now().strftime("%d_%m_%Y")}',
# Name of task you want to run, used to generate Pod ID.
name=dag_name,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
# cmds=['echo'],
# The namespace to run within Kubernetes, default namespace is
# `default`. There is the potential for the resource starvation of
# Airflow workers and scheduler within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources.
namespace='default',
# Setup email on failure and success
email_on_failure=True,
email_on_success=True,
email=email,
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image=docker_image,
image_pull_secrets='gitlab',
image_pull_policy='Always',
# if you have a larger image, you may need to increase the default from 120
startup_timeout_seconds=600)
Upvotes: 0
Views: 1580
Reputation: 53411
The problem has to do with the email_on_success
parameter: as you can see in the BaseOperator
documentation, only email_on_retry
and email_on_failure
are supported.
If you need to send email on success, you may use on_success_callback
. Please, consider for instance this example obtained from this Github issue:
from airflow.utils.email import send_email
def on_success_callback(context):
ti: TaskInstance = context["ti"]
dag_id = ti.dag_id
task_id = ti.task_id
msg = "DAG succeeded"
subject = f"Success {dag_id}.{task_id}"
send_email(to=EMAIL_LIST, subject=subject, html_content=msg)
Please, note the callback of the example is defined at the DAG level. The airflow documentation provides several additional examples.
In your specific use case, it could looks like the following:
import datetime
from airflow import models
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.utils.email import send_email
from kubernetes.client import models as k8s
# DEFINE VARS HERE:
dag_name = "daily_data_pipeline"
schedule_interval = '@daily'
email = ["[email protected]"]
# get this from Gitlab
docker_image = "registry.gitlab.com/xxxx:dev"
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2021, 6, 26),
'depends_on_past': False,
'max_active_runs': 1,
# 'concurrency': 1
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
dag_name,
schedule_interval=schedule_interval,
default_args=default_dag_args,
catchup=False) as dag:
def notify_successful_execution(context):
# Access the information provided in context if required
send_email(
to=[email],
subject='Successful execution',
html_content='The process was executed successfully'
)
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id=f'{dag_name}_{datetime.datetime.now().strftime("%d_%m_%Y")}',
# Name of task you want to run, used to generate Pod ID.
name=dag_name,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
# cmds=['echo'],
# The namespace to run within Kubernetes, default namespace is
# `default`. There is the potential for the resource starvation of
# Airflow workers and scheduler within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources.
namespace='default',
# Setup email on failure and success
email_on_failure=True,
# Comment the following parameter, it is unsupported
# email_on_success=True,
email=email,
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image=docker_image,
image_pull_secrets='gitlab',
image_pull_policy='Always',
# if you have a larger image, you may need to increase the default from 120
startup_timeout_seconds=600,
# Use on success callback instead for your email notifications
on_success_callback=notify_successful_execution)
Upvotes: 2