Jay j
Jay j

Reputation: 23

Airflow Kubernetes EFS ReadWriteMany volume mount not working if the pod count is around 100

Dags/test_parallelism.py

If I run below DAG with AWS EFS volume mounted then it works for pod counts until 25 with no issues. But if I increase the pods count to 100 then I start getting the timeout issues.

`Unable to attach or mount volumes: unmounted volumes=[logs], unattached volumes=[logs config backups kube-api-access-jxz9w]: timed out waiting for the condition

Unable to attach or mount volumes: unmounted volumes=[logs], unattached volumes=[backups kube-api-access-q6b8x logs config]: timed out waiting for the condition `

Dags/test_parallelism.py

import time
import logging
import os
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from kubernetes.client import models as k8s

def test(**context):
    """
    Tests whether the volume has been mounted.
    """
    time.sleep(int(os.environ["parallel_test_sleep"]))

default_args = {
    "owner": 'Airflow',
    "start_date": datetime(2021, 1, 1),
}

dag = DAG(
    dag_id='test_1000_task_1',
    schedule_interval="0 * * * *",
    default_args=default_args,
    catchup=False
)

with dag:
    for i in range(int(os.environ["parallel_test_count"])):
        task = PythonOperator(
            task_id=f"task_{i}",
            python_callable=test,
            provide_context=True,
            executor_config={
                "pod_override": k8s.V1Pod(
                    spec=k8s.V1PodSpec(
                        containers=[
                            k8s.V1Container(
                                name="base",
                                volume_mounts=[
                                    k8s.V1VolumeMount(
                                        mount_path="/opt/airflow/backups/", name="backups", read_only=False
                                    )
                                ],
                            )
                        ],
                        volumes=[
                            k8s.V1Volume(
                                name="backups",
                                persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-s3-pvc"),
                            )
                        ],
                    )
                ),
            }
        )

The EFS volume should be mounted for the Kubernetes pods since the access mode is set to ReadWriteMany

I have mounted 2 EFS volume to all the Kubernetes pods. One is via the DAG Pod override and one for the airfl

logs: persistence: # Enable persistent volume for storing logs enabled: true # Volume size for logs size: 14Gi # Annotations for the logs PVC annotations: {} # If using a custom storageClass, pass name here storageClassName: "efs-sc" ## the name of an existing PVC to use existingClaim: "airflow-logs"

kubectl logs for one of unmounted pods:


Name:         test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c
Namespace:    sb-jniravel
Priority:     0
Node:         ip-10-0-133-146.ec2.internal/10.0.133.146
Start Time:   Wed, 16 Aug 2023 09:21:57 -0500
Labels:       airflow-worker=1188
              airflow_version=2.6.0
              component=worker
              dag_id=test_1000_task_1
              kubernetes_executor=True
              release=airflow
              run_id=manual__2023-08-16T142155.7297460000-c3a08be2d
              task_id=task_44
              tier=airflow
              try_number=1
Annotations:  dag_id: test_1000_task_1
              openshift.io/scc: airflow-cluster-scc
              run_id: manual__2023-08-16T14:21:55.729746+00:00
              seccomp.security.alpha.kubernetes.io/pod: runtime/default
              task_id: task_44
              try_number: 1
Status:       Pending
IP:
IPs:          <none>
Containers:
  base:
    Container ID:
    Image:         truu.jfrog.io/airflow-etl-repo/airflow:v37
    Image ID:
    Port:          <none>
    Host Port:     <none>
    Args:
      airflow
      tasks
      run
      test_1000_task_1
      task_44
      manual__2023-08-16T14:21:55.729746+00:00
      --local
      --subdir
      DAGS_FOLDER/test_parallelism.py
    State:          Waiting
      Reason:       ContainerCreating
    Ready:          False
    Restart Count:  0
    Environment:
      AIRFLOW__CORE__EXECUTOR:                                                                                   LocalExecutor
      AIRFLOW__CORE__FERNET_KEY:                                                                                 <set to the key 'fernet-key' in secret 'airflow-fernet-key'>                      Optional: false
      AIRFLOW__CORE__SQL_ALCHEMY_CONN:                                                                           <set to the key 'connection' in secret 'airflow-airflow-metadata'>                Optional: false
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN:                                                                       <set to the key 'connection' in secret 'airflow-airflow-metadata'>                Optional: false
      AIRFLOW_CONN_AIRFLOW_DB:                                                                                   <set to the key 'connection' in secret 'airflow-airflow-metadata'>                Optional: false
      AIRFLOW__WEBSERVER__SECRET_KEY:                                                                            <set to the key 'webserver-secret-key' in secret 'airflow-webserver-secret-key'>  Optional: false
      AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT:                                                               500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT:                    500
      AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT:                                                                      360.0
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT:                           360.0
      AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE:                                                                  -1
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE:                       -1
      AIRFLOW__CORE__PARALLELISM:                                                                                500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__PARALLELISM:                                     500
      AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG:                                                                   500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG:                        500
      AIRFLOW__SCHEDULER__PARSING_PROCESSES:                                                                     32
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__PARSING_PROCESSES:                          32
      AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD:                                                      60
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD:           60
      AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG:                                                                    500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG:                         500
      AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT:                                                                 360
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT:                      360
      AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE:                                             25
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE:  25
      AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL:                                                             600
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL:                  600
      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL:                                                                 600
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL:                      600
      AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP:                                                        500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP:             500
      AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE:                                                      500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE:           500
      AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD:                                                       600
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD:            600
      parallel_test_count:                                                                                       50
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_count:                                            50
      parallel_test_sleep:                                                                                       60
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_sleep:                                            60
      AIRFLOW_IS_K8S_EXECUTOR_POD:                                                                               True
    Mounts:
      /opt/airflow/airflow.cfg from config (ro,path="airflow.cfg")
      /opt/airflow/backups/ from backups (rw)
      /opt/airflow/config/airflow_local_settings.py from config (ro,path="airflow_local_settings.py")
      /opt/airflow/logs from logs (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-k76b5 (ro)
Conditions:
  Type              Status
  Initialized       True
  Ready             False
  ContainersReady   False
  PodScheduled      True
Volumes:
  logs:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  airflow-logs
    ReadOnly:   false
  config:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      airflow-airflow-config
    Optional:  false
  backups:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  airflow-s3-pvc
    ReadOnly:   false
  kube-api-access-k76b5:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  3607
    ConfigMapName:           kube-root-ca.crt
    ConfigMapOptional:       <nil>
    DownwardAPI:             true
    ConfigMapName:           openshift-service-ca.crt
    ConfigMapOptional:       <nil>
QoS Class:                   BestEffort
Node-Selectors:              <none>
Tolerations:                 node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                             node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type     Reason       Age        From               Message
  ----     ------       ----       ----               -------
  Normal   Scheduled    111s       default-scheduler  Successfully assigned sb-jniravel/test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c to ip-10-0-133-146.ec2.internal
  Warning  FailedMount  <invalid>  kubelet            Unable to attach or mount volumes: unmounted volumes=[logs backups], unattached volumes=[kube-api-access-k76b5 logs config backups]: timed out waiting for the condition

Upvotes: 0

Views: 300

Answers (1)

zhangkuantian
zhangkuantian

Reputation: 1

We encountered a similar issue using Airflow version 2.6.3, where every pod startup would execute chown/chmod -R on the 'logs' directory, resulting in timeouts, especially when the directory contained a large number of files. The following log entries were observed:

  Normal   DefaultInstanceTypeMatch  5m2s   EciService  [eci.containergroup]The default instanceType used for the current eci instance is 2.0-4.0Gi
  Normal   SuccessfulHitImageCache   5m1s   EciService  [eci.imagecache]Successfully hit image cache imc-xxxxxxxxx, eci will be scheduled with this image cache.
  Warning  FailedMount               2m48s  kubelet     Unable to attach or mount volumes: unmounted volumes=[logs], unattached volumes=[dags spark-defaults sqldwh-dbt airflow-dbt-worker-token-xxxxxx logs config]: timed out waiting for the condition

Upon inspecting the Kubernetes server logs, we discovered that each pod startup would attempt to execute chown/chmod -R on the 'logs' directory, causing timeouts, particularly when the directory had a substantial number of files. We resolved this issue by adding the following configuration:

# Default security context for Airflow
securityContext:
  fsGroupChangePolicy: "OnRootMismatch"

The fsGroupChangePolicy: "OnRootMismatch" configuration specifies the policy for handling fsGroup when it does not match the root directory. This configuration helped address issues related to file permissions and timeouts. For more details on fsGroupChangePolicy, you can refer to: https://kubernetes.io/blog/2020/12/14/kubernetes-release-1.20-fsgroupchangepolicy-fsgrouppolicy/

Upvotes: 0

Related Questions