Reputation: 23
Dags/test_parallelism.py
If I run below DAG with AWS EFS volume mounted then it works for pod counts until 25 with no issues. But if I increase the pods count to 100 then I start getting the timeout issues.
`Unable to attach or mount volumes: unmounted volumes=[logs], unattached volumes=[logs config backups kube-api-access-jxz9w]: timed out waiting for the condition
Unable to attach or mount volumes: unmounted volumes=[logs], unattached volumes=[backups kube-api-access-q6b8x logs config]: timed out waiting for the condition `
Dags/test_parallelism.py
import time
import logging
import os
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from kubernetes.client import models as k8s
def test(**context):
"""
Tests whether the volume has been mounted.
"""
time.sleep(int(os.environ["parallel_test_sleep"]))
default_args = {
"owner": 'Airflow',
"start_date": datetime(2021, 1, 1),
}
dag = DAG(
dag_id='test_1000_task_1',
schedule_interval="0 * * * *",
default_args=default_args,
catchup=False
)
with dag:
for i in range(int(os.environ["parallel_test_count"])):
task = PythonOperator(
task_id=f"task_{i}",
python_callable=test,
provide_context=True,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/opt/airflow/backups/", name="backups", read_only=False
)
],
)
],
volumes=[
k8s.V1Volume(
name="backups",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-s3-pvc"),
)
],
)
),
}
)
The EFS volume should be mounted for the Kubernetes pods since the access mode is set to ReadWriteMany
I have mounted 2 EFS volume to all the Kubernetes pods. One is via the DAG Pod override and one for the airfl
logs: persistence: # Enable persistent volume for storing logs enabled: true # Volume size for logs size: 14Gi # Annotations for the logs PVC annotations: {} # If using a custom storageClass, pass name here storageClassName: "efs-sc" ## the name of an existing PVC to use existingClaim: "airflow-logs"
kubectl logs for one of unmounted pods:
Name: test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c
Namespace: sb-jniravel
Priority: 0
Node: ip-10-0-133-146.ec2.internal/10.0.133.146
Start Time: Wed, 16 Aug 2023 09:21:57 -0500
Labels: airflow-worker=1188
airflow_version=2.6.0
component=worker
dag_id=test_1000_task_1
kubernetes_executor=True
release=airflow
run_id=manual__2023-08-16T142155.7297460000-c3a08be2d
task_id=task_44
tier=airflow
try_number=1
Annotations: dag_id: test_1000_task_1
openshift.io/scc: airflow-cluster-scc
run_id: manual__2023-08-16T14:21:55.729746+00:00
seccomp.security.alpha.kubernetes.io/pod: runtime/default
task_id: task_44
try_number: 1
Status: Pending
IP:
IPs: <none>
Containers:
base:
Container ID:
Image: truu.jfrog.io/airflow-etl-repo/airflow:v37
Image ID:
Port: <none>
Host Port: <none>
Args:
airflow
tasks
run
test_1000_task_1
task_44
manual__2023-08-16T14:21:55.729746+00:00
--local
--subdir
DAGS_FOLDER/test_parallelism.py
State: Waiting
Reason: ContainerCreating
Ready: False
Restart Count: 0
Environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__FERNET_KEY: <set to the key 'fernet-key' in secret 'airflow-fernet-key'> Optional: false
AIRFLOW__CORE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-airflow-metadata'> Optional: false
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-airflow-metadata'> Optional: false
AIRFLOW_CONN_AIRFLOW_DB: <set to the key 'connection' in secret 'airflow-airflow-metadata'> Optional: false
AIRFLOW__WEBSERVER__SECRET_KEY: <set to the key 'webserver-secret-key' in secret 'airflow-webserver-secret-key'> Optional: false
AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT: 500
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 360.0
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 360.0
AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE: -1
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE: -1
AIRFLOW__CORE__PARALLELISM: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__PARALLELISM: 500
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 500
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 32
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__PARSING_PROCESSES: 32
AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD: 60
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD: 60
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 500
AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT: 360
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT: 360
AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE: 25
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE: 25
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 600
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 600
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 600
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 600
AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP: 500
AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE: 500
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE: 500
AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD: 600
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD: 600
parallel_test_count: 50
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_count: 50
parallel_test_sleep: 60
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_sleep: 60
AIRFLOW_IS_K8S_EXECUTOR_POD: True
Mounts:
/opt/airflow/airflow.cfg from config (ro,path="airflow.cfg")
/opt/airflow/backups/ from backups (rw)
/opt/airflow/config/airflow_local_settings.py from config (ro,path="airflow_local_settings.py")
/opt/airflow/logs from logs (rw)
/var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-k76b5 (ro)
Conditions:
Type Status
Initialized True
Ready False
ContainersReady False
PodScheduled True
Volumes:
logs:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: airflow-logs
ReadOnly: false
config:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: airflow-airflow-config
Optional: false
backups:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: airflow-s3-pvc
ReadOnly: false
kube-api-access-k76b5:
Type: Projected (a volume that contains injected data from multiple sources)
TokenExpirationSeconds: 3607
ConfigMapName: kube-root-ca.crt
ConfigMapOptional: <nil>
DownwardAPI: true
ConfigMapName: openshift-service-ca.crt
ConfigMapOptional: <nil>
QoS Class: BestEffort
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 111s default-scheduler Successfully assigned sb-jniravel/test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c to ip-10-0-133-146.ec2.internal
Warning FailedMount <invalid> kubelet Unable to attach or mount volumes: unmounted volumes=[logs backups], unattached volumes=[kube-api-access-k76b5 logs config backups]: timed out waiting for the condition
Upvotes: 0
Views: 300
Reputation: 1
We encountered a similar issue using Airflow version 2.6.3, where every pod startup would execute chown/chmod -R on the 'logs' directory, resulting in timeouts, especially when the directory contained a large number of files. The following log entries were observed:
Normal DefaultInstanceTypeMatch 5m2s EciService [eci.containergroup]The default instanceType used for the current eci instance is 2.0-4.0Gi
Normal SuccessfulHitImageCache 5m1s EciService [eci.imagecache]Successfully hit image cache imc-xxxxxxxxx, eci will be scheduled with this image cache.
Warning FailedMount 2m48s kubelet Unable to attach or mount volumes: unmounted volumes=[logs], unattached volumes=[dags spark-defaults sqldwh-dbt airflow-dbt-worker-token-xxxxxx logs config]: timed out waiting for the condition
Upon inspecting the Kubernetes server logs, we discovered that each pod startup would attempt to execute chown/chmod -R on the 'logs' directory, causing timeouts, particularly when the directory had a substantial number of files. We resolved this issue by adding the following configuration:
# Default security context for Airflow
securityContext:
fsGroupChangePolicy: "OnRootMismatch"
The fsGroupChangePolicy: "OnRootMismatch" configuration specifies the policy for handling fsGroup when it does not match the root directory. This configuration helped address issues related to file permissions and timeouts. For more details on fsGroupChangePolicy, you can refer to: https://kubernetes.io/blog/2020/12/14/kubernetes-release-1.20-fsgroupchangepolicy-fsgrouppolicy/
Upvotes: 0