slashie
slashie

Reputation: 137

airflow upgrade 2.0 kubernetes_pod_operator not working

I upgraded my airflow to 2.0. After upgrading, my kubernetes_pod_operator not working and give me the following error. How do I fix this upgrade issue. What do I need to change in the code to make it work in airflow 2.0?

Error:

Broken DAG: [/home/airflow/gcs/dags/daily_data_dag.py] Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 178, in apply_defaults
    result = func(self, *args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 506, in __init__
    raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to KubernetesPodOperator (task_id: snowpack_daily_data_pipeline_16_11_2021). Invalid arguments were:
**kwargs: {'email_on_success': True}

Code:

import datetime

from airflow import models
from airflow.contrib.operators import kubernetes_pod_operator

from kubernetes.client import models as k8s

# DEFINE VARS HERE:

dag_name = "daily_data_pipeline"
schedule_interval = '@daily'
email = ["[email protected]"]
# get this from Gitlab
docker_image = "registry.gitlab.com/xxxx:dev"


default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2021, 6, 26),
    'depends_on_past': False,
    'max_active_runs': 1,
    # 'concurrency': 1
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        dag_name,
        schedule_interval=schedule_interval,
        default_args=default_dag_args,
        catchup=False) as dag:

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id=f'{dag_name}_{datetime.datetime.now().strftime("%d_%m_%Y")}',
        # Name of task you want to run, used to generate Pod ID.
        name=dag_name,
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Setup email on failure and success
        email_on_failure=True,
        email_on_success=True,
        email=email,
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image=docker_image,
        image_pull_secrets='gitlab',
        image_pull_policy='Always',
        # if you have a larger image, you may need to increase the default from 120
        startup_timeout_seconds=600)

Upvotes: 0

Views: 1580

Answers (1)

jccampanero
jccampanero

Reputation: 53411

The problem has to do with the email_on_success parameter: as you can see in the BaseOperator documentation, only email_on_retry and email_on_failure are supported.

If you need to send email on success, you may use on_success_callback. Please, consider for instance this example obtained from this Github issue:

from airflow.utils.email import send_email

def on_success_callback(context):
    ti: TaskInstance = context["ti"]
    dag_id = ti.dag_id
    task_id = ti.task_id

    msg = "DAG succeeded"
    subject = f"Success {dag_id}.{task_id}"
    send_email(to=EMAIL_LIST, subject=subject, html_content=msg)

Please, note the callback of the example is defined at the DAG level. The airflow documentation provides several additional examples.

In your specific use case, it could looks like the following:

import datetime

from airflow import models
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.utils.email import send_email

from kubernetes.client import models as k8s

# DEFINE VARS HERE:

dag_name = "daily_data_pipeline"
schedule_interval = '@daily'
email = ["[email protected]"]
# get this from Gitlab
docker_image = "registry.gitlab.com/xxxx:dev"


default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2021, 6, 26),
    'depends_on_past': False,
    'max_active_runs': 1,
    # 'concurrency': 1
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        dag_name,
        schedule_interval=schedule_interval,
        default_args=default_dag_args,
        catchup=False) as dag:

    def notify_successful_execution(context):
        # Access the information provided in context if required 
        send_email(
            to=[email],
            subject='Successful execution',
            html_content='The process was executed successfully'
        )
        

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id=f'{dag_name}_{datetime.datetime.now().strftime("%d_%m_%Y")}',
        # Name of task you want to run, used to generate Pod ID.
        name=dag_name,
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Setup email on failure and success
        email_on_failure=True,
        # Comment the following parameter, it is unsupported
        # email_on_success=True,
        email=email,
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image=docker_image,
        image_pull_secrets='gitlab',
        image_pull_policy='Always',
        # if you have a larger image, you may need to increase the default from 120
        startup_timeout_seconds=600,
        # Use on success callback instead for your email notifications
        on_success_callback=notify_successful_execution)

Upvotes: 2

Related Questions