Reputation: 1228
I am struggling to understand the Spark documentation in order to set up the local-dir correctly.
I am running Spark 3.1.2 on Kubernetes via the Sparkoperator approach. The Number of executor Pods varying on job size and available resources on the cluster. A typical case is that i start the job with 20 requested executors but 3 Pods remain in pending state and spark complete the job with 17 executors.
I am running in the error "The node was low on resource: ephemeral-storage." due to much spilling of data into the default local-dir created via empty-dir
on the kubernetes nodes.
This is a known issue and it should be solved by pointing the local-dir
to a mounted presistent volume.
I tried to approaches but both are not working:
Following the documentation https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage I added the following options into the spark-config
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "tmp-spark-spill"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass": "csi-rbd-sc"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit": "3000Gi"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path": ="/spill-data"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly": "false"
the full yaml looks like
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: job1
namespace: spark
spec:
serviceAccount: spark
type: Python
pythonVersion: "3"
mode: cluster
image: "xxx/spark-py:app-3.1.2"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/work-dir/nfs/06_dwh_core/jobs/job1/main.py
sparkVersion: "3.0.0"
restartPolicy:
type: OnFailure
onFailureRetries: 0
onFailureRetryInterval: 10
onSubmissionFailureRetries: 0
onSubmissionFailureRetryInterval: 20
sparkConf:
"spark.default.parallelism": "400"
"spark.sql.shuffle.partitions": "400"
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
"spark.sql.debug.maxToStringFields": "1000"
"spark.ui.port": "4045"
"spark.driver.maxResultSize": "0"
"spark.kryoserializer.buffer.max": "512"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "tmp-spark-spill"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass": "csi-rbd-sc"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit": "3000Gi"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path": ="/spill-data"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly": "false"
driver:
cores: 1
memory: "20G"
labels:
version: 3.1.2
serviceAccount: spark
volumeMounts:
- name: nfs
mountPath: /opt/spark/work-dir/nfs
executor:
cores: 20
instances: 20
memory: "150G"
labels:
version: 3.0.0
volumeMounts:
- name: nfs
mountPath: /opt/spark/work-dir/nfs
volumes:
- name: nfs
nfs:
server: xxx
path: /xxx
readOnly: false
this results in an error saying that the pvc already exists and it only creates effectively one executor.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://kubernetes.default.svc/api/v1/namespaces/spark-poc/persistentvolumeclaims. Message: persistentvolumeclaims "tmp-spark-spill" already exists. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=null, kind=persistentvolumeclaims, name=tmp-spark-spill, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=persistentvolumeclaims "tmp-spark-spill" already exists, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=AlreadyExists, status=Failure, additionalProperties={}).
Do I have to define this local-dir claims for every executor? kind of
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "tmp-spark-spill"
.
.
.
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-2.options.claimName": "tmp-spark-spill"
.
.
.
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-3.options.claimName": "tmp-spark-spill"
.
.
.
But how can I make it dynamicaly if I have changing numbers of executors? Is it not automatically picking it up from the executor config?
I created an pvc myself mounted it as volume and set the local-dir as spark config parameter
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pvc-spark-spill
namespace: spark-poc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 3000Gi
storageClassName: csi-rbd-sc
volumeMode: Filesystem
mounted into to executors like
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: job1
namespace: spark
spec:
serviceAccount: spark
type: Python
pythonVersion: "3"
mode: cluster
image: "xxx/spark-py:app-3.1.2"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/work-dir/nfs/06_dwh_core/jobs/job1/main.py
sparkVersion: "3.0.0"
restartPolicy:
type: OnFailure
onFailureRetries: 0
onFailureRetryInterval: 10
onSubmissionFailureRetries: 0
onSubmissionFailureRetryInterval: 20
sparkConf:
"spark.default.parallelism": "400"
"spark.sql.shuffle.partitions": "400"
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
"spark.sql.debug.maxToStringFields": "1000"
"spark.ui.port": "4045"
"spark.driver.maxResultSize": "0"
"spark.kryoserializer.buffer.max": "512"
"spark.local.dir": "/spill"
driver:
cores: 1
memory: "20G"
labels:
version: 3.1.2
serviceAccount: spark
volumeMounts:
- name: nfs
mountPath: /opt/spark/work-dir/nfs
executor:
cores: 20
instances: 20
memory: "150G"
labels:
version: 3.0.0
volumeMounts:
- name: nfs
mountPath: /opt/spark/work-dir/nfs
- name: pvc-spark-spill
mountPath: /spill
volumes:
- name: nfs
nfs:
server: xxx
path: /xxx
readOnly: false
- name: pvc-spark-spill
persistentVolumeClaim:
claimName: pvc-spark-spill
This approach fails with the message that the /spill
must be unique.
Message: Pod "job1-driver" is invalid: spec.containers[0].volumeMounts[7].mountPath: Invalid value: "/spill": must be unique.
It seems that every executor needs his own pvc or at least his own folder on the pvc to spill his data. But how do I configure it correctly? I am not getting it from the documentation
Thanks for your help Alex
Upvotes: 0
Views: 1787
Reputation: 879
spark should be able to create PVC dynamically by setting up claimName= OnDemand. Attaching multiple pods for the same pvc will be issue on Kubernetes end
attaching screenshot for documentation
You can look into nfs share which will work outside kubenetes manage volumes. Example https://www.datamechanics.co/blog-post/apache-spark-3-1-release-spark-on-kubernetes-is-now-ga
Upvotes: 1