The Dan
The Dan

Reputation: 1680

Airflow - KubernetesPodOperator - Broken DAG: unexpected keyword argument 'request_cpu'

I'm using the following Airflow version inside my Docker container and I am currently having some issues related to a broken DAG

FROM apache/airflow:2.3.4-python3.9

I have other DAGs running with the same argument 'request_cpu' and perfectly functional, I'm not sure what the issue could be

Broken DAG: [/home/airflow/airflow/dags/my_project.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 858, in __init__
    self.resources = coerce_resources(resources)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 133, in coerce_resources
    return Resources(**resources)
TypeError: Resources.__init__() got an unexpected keyword argument 'request_cpu'

This is my current DAG configuration

# DAG configuration
DAG_ID = "my_project_id"
DAG_DESCRIPTION = "description"
DAG_IMAGE = image

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "max_active_tasks": 1,
    "max_active_runs": 1,
    "email_on_failure": True,
    "email": ["[email protected]"],
    "retries": 0,
    "email_on_retry": False,
    "image_pull_policy": "Always",
}

# Define desired resources.
compute_resources = {
    # Cpu: 500m milliCPU is about half cpu, other values, 1, 2, 4... for full cpu allocation
    "request_cpu": "500m",
    # Memory: Mi for Megabytes or Gi for Gigabytes
    "request_memory": "512Mi",
    "limit_cpu": "500m",
    "limit_memory": "1Gi",
}


with DAG(
    DAG_ID,
    default_args=default_args,
    start_date=datetime(2022, 5, 9),
    schedule_interval="0 21 */16 * *",  # Every 16 days or twice per month
    max_active_runs=1,
    max_active_tasks=1,
    catchup=False,
    description=DAG_DESCRIPTION,
    tags=["my tags"],
) as dag:
    # AWS credentials
    creds = tools.get_config_params(key="AWS-keys")

    my_task = KubernetesPodOperator(
        namespace="airflow",
        image=DAG_IMAGE,
        image_pull_secrets=[k8s.V1LocalObjectReference("docker-registry")],
        container_resources=compute_resources,
        env_vars={
            "AWS_ACCESS_KEY_ID": creds["access_key"],
            "AWS_SECRET_ACCESS_KEY": creds["secret_access_key"],
            "EXECUTION_DATE": "{{ execution_date }}",
        },
        cmds=["python3", "my_project.py"],
        is_delete_operator_pod=True,
        in_cluster=False,
        name="my-project-name",
        task_id="my-task",
        config_file=os.path.expanduser("~") + "/.kube/config",
        get_logs=True,
        resources=compute_resources,
    )

Upvotes: 3

Views: 2922

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15921

First resources is deprecated so you should use only container_resources.

The container_resources is expecting V1ResourceRequirements not dict. You should do:

from kubernetes.client import models as k8s

compute_resources=k8s.V1ResourceRequirements(
    requests={
        'memory': '512Mi',
        'cpu': '500m'
    },
    limits={
        'memory': '1Gi',
        'cpu': 500m
    }
)

Then

 my_task = KubernetesPodOperator(..., container_resources=compute_resources)

Upvotes: 7

Related Questions