Alex Ortner
Alex Ortner

Reputation: 1228

Spark on Kubernetes: spark-local-dir ERROR: already exists/not unique

I am struggling to understand the Spark documentation in order to set up the local-dir correctly.

Setup:

I am running Spark 3.1.2 on Kubernetes via the Sparkoperator approach. The Number of executor Pods varying on job size and available resources on the cluster. A typical case is that i start the job with 20 requested executors but 3 Pods remain in pending state and spark complete the job with 17 executors.

Base Problem:

I am running in the error "The node was low on resource: ephemeral-storage." due to much spilling of data into the default local-dir created via empty-diron the kubernetes nodes.

This is a known issue and it should be solved by pointing the local-dir to a mounted presistent volume.

I tried to approaches but both are not working:

Approach 1:

Following the documentation https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage I added the following options into the spark-config

"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "tmp-spark-spill"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass": "csi-rbd-sc"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit": "3000Gi"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path": ="/spill-data"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly": "false"

the full yaml looks like

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: job1
  namespace: spark
spec:
  serviceAccount: spark
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "xxx/spark-py:app-3.1.2"
  imagePullPolicy: Always
  mainApplicationFile: local:///opt/spark/work-dir/nfs/06_dwh_core/jobs/job1/main.py
  sparkVersion: "3.0.0"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 0
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 0
    onSubmissionFailureRetryInterval: 20
  sparkConf:
    "spark.default.parallelism": "400"
    "spark.sql.shuffle.partitions": "400"
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
    "spark.sql.debug.maxToStringFields": "1000"
    "spark.ui.port": "4045"
    "spark.driver.maxResultSize": "0" 
    "spark.kryoserializer.buffer.max": "512"
    "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "tmp-spark-spill"
    "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass": "csi-rbd-sc"
    "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit": "3000Gi"
    "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path": ="/spill-data"
    "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly": "false"
  driver:
    cores: 1
    memory: "20G"
    labels:
      version: 3.1.2
    serviceAccount: spark
    volumeMounts:
      - name: nfs
        mountPath: /opt/spark/work-dir/nfs
  executor:
    cores: 20
    instances: 20
    memory: "150G"
    labels:
      version: 3.0.0
    volumeMounts:
      - name: nfs
        mountPath: /opt/spark/work-dir/nfs
  volumes:
    - name: nfs
      nfs:
        server: xxx
        path: /xxx
        readOnly: false

Issue 1:

this results in an error saying that the pvc already exists and it only creates effectively one executor.

io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://kubernetes.default.svc/api/v1/namespaces/spark-poc/persistentvolumeclaims. Message: persistentvolumeclaims "tmp-spark-spill" already exists. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=null, kind=persistentvolumeclaims, name=tmp-spark-spill, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=persistentvolumeclaims "tmp-spark-spill" already exists, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=AlreadyExists, status=Failure, additionalProperties={}).

Do I have to define this local-dir claims for every executor? kind of

 "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "tmp-spark-spill"
.
.
.
 "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-2.options.claimName": "tmp-spark-spill"
.
.
.
 "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-3.options.claimName": "tmp-spark-spill"
.
.
.

But how can I make it dynamicaly if I have changing numbers of executors? Is it not automatically picking it up from the executor config?

Approach 2:

I created an pvc myself mounted it as volume and set the local-dir as spark config parameter

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: pvc-spark-spill
  namespace: spark-poc
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 3000Gi
  storageClassName: csi-rbd-sc
  volumeMode: Filesystem

mounted into to executors like

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: job1
  namespace: spark
spec:
  serviceAccount: spark
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "xxx/spark-py:app-3.1.2"
  imagePullPolicy: Always
  mainApplicationFile: local:///opt/spark/work-dir/nfs/06_dwh_core/jobs/job1/main.py
  sparkVersion: "3.0.0"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 0
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 0
    onSubmissionFailureRetryInterval: 20
  sparkConf:
    "spark.default.parallelism": "400"
    "spark.sql.shuffle.partitions": "400"
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
    "spark.sql.debug.maxToStringFields": "1000"
    "spark.ui.port": "4045"
    "spark.driver.maxResultSize": "0" 
    "spark.kryoserializer.buffer.max": "512"
    "spark.local.dir": "/spill"
  driver:
    cores: 1
    memory: "20G"
    labels:
      version: 3.1.2
    serviceAccount: spark
    volumeMounts:
      - name: nfs
        mountPath: /opt/spark/work-dir/nfs
  executor:
    cores: 20
    instances: 20
    memory: "150G"
    labels:
      version: 3.0.0
    volumeMounts:
      - name: nfs
        mountPath: /opt/spark/work-dir/nfs
      - name: pvc-spark-spill
        mountPath: /spill
  volumes:
    - name: nfs
      nfs:
        server: xxx
        path: /xxx
        readOnly: false
    - name: pvc-spark-spill
      persistentVolumeClaim:
        claimName: pvc-spark-spill

Issue 2

This approach fails with the message that the /spill must be unique.

 Message: Pod "job1-driver" is invalid: spec.containers[0].volumeMounts[7].mountPath: Invalid value: "/spill": must be unique.

Summary and Questions

It seems that every executor needs his own pvc or at least his own folder on the pvc to spill his data. But how do I configure it correctly? I am not getting it from the documentation

Thanks for your help Alex

Upvotes: 0

Views: 1787

Answers (1)

Vish
Vish

Reputation: 879

spark should be able to create PVC dynamically by setting up claimName= OnDemand. Attaching multiple pods for the same pvc will be issue on Kubernetes end

attaching screenshot for documentation config screenshot

You can look into nfs share which will work outside kubenetes manage volumes. Example https://www.datamechanics.co/blog-post/apache-spark-3-1-release-spark-on-kubernetes-is-now-ga

Upvotes: 1

Related Questions