Reputation: 1680
I'm using the following Airflow version inside my Docker container and I am currently having some issues related to a broken DAG
FROM apache/airflow:2.3.4-python3.9
I have other DAGs running with the same argument 'request_cpu' and perfectly functional, I'm not sure what the issue could be
Broken DAG: [/home/airflow/airflow/dags/my_project.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 858, in __init__
self.resources = coerce_resources(resources)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 133, in coerce_resources
return Resources(**resources)
TypeError: Resources.__init__() got an unexpected keyword argument 'request_cpu'
This is my current DAG configuration
# DAG configuration
DAG_ID = "my_project_id"
DAG_DESCRIPTION = "description"
DAG_IMAGE = image
default_args = {
"owner": "airflow",
"depends_on_past": False,
"max_active_tasks": 1,
"max_active_runs": 1,
"email_on_failure": True,
"email": ["[email protected]"],
"retries": 0,
"email_on_retry": False,
"image_pull_policy": "Always",
}
# Define desired resources.
compute_resources = {
# Cpu: 500m milliCPU is about half cpu, other values, 1, 2, 4... for full cpu allocation
"request_cpu": "500m",
# Memory: Mi for Megabytes or Gi for Gigabytes
"request_memory": "512Mi",
"limit_cpu": "500m",
"limit_memory": "1Gi",
}
with DAG(
DAG_ID,
default_args=default_args,
start_date=datetime(2022, 5, 9),
schedule_interval="0 21 */16 * *", # Every 16 days or twice per month
max_active_runs=1,
max_active_tasks=1,
catchup=False,
description=DAG_DESCRIPTION,
tags=["my tags"],
) as dag:
# AWS credentials
creds = tools.get_config_params(key="AWS-keys")
my_task = KubernetesPodOperator(
namespace="airflow",
image=DAG_IMAGE,
image_pull_secrets=[k8s.V1LocalObjectReference("docker-registry")],
container_resources=compute_resources,
env_vars={
"AWS_ACCESS_KEY_ID": creds["access_key"],
"AWS_SECRET_ACCESS_KEY": creds["secret_access_key"],
"EXECUTION_DATE": "{{ execution_date }}",
},
cmds=["python3", "my_project.py"],
is_delete_operator_pod=True,
in_cluster=False,
name="my-project-name",
task_id="my-task",
config_file=os.path.expanduser("~") + "/.kube/config",
get_logs=True,
resources=compute_resources,
)
Upvotes: 3
Views: 2922
Reputation: 15921
First resources
is deprecated so you should use only container_resources
.
The container_resources
is expecting V1ResourceRequirements
not dict
. You should do:
from kubernetes.client import models as k8s
compute_resources=k8s.V1ResourceRequirements(
requests={
'memory': '512Mi',
'cpu': '500m'
},
limits={
'memory': '1Gi',
'cpu': 500m
}
)
Then
my_task = KubernetesPodOperator(..., container_resources=compute_resources)
Upvotes: 7