Reputation: 737
I have a Spark Cluster on Google Dataproc using Compute Engine. The cluster has 1 Master node with 4 cores and 16GB RAM and 5 Worker nodes with 8 cores and 32GB RAM each.
When running SparkConf().getAll()
I get this result:
[('spark.eventLog.enabled', 'true'),
('spark.dynamicAllocation.minExecutors', '1'),
('spark.driver.maxResultSize', '2048m'),
('spark.executor.memory', '12859m'),
('spark.yarn.am.memory', '640m'),
('spark.executor.cores', '4'),
('spark.eventLog.dir',
'gs://dataproc-temp-europe-west1-907569830041-jsgvqmyn/0255e376-31c9-4b52-8e63-a4fe6188eba3/spark-job-history'),
('spark.executor.instances', '2'),
('spark.yarn.unmanagedAM.enabled', 'true'),
('spark.submit.deployMode', 'client'),
('spark.extraListeners',
'com.google.cloud.spark.performance.DataprocMetricsListener'),
('spark.driver.memory', '4096m'),
('spark.sql.cbo.joinReorder.enabled', 'true'),
('spark.sql.autoBroadcastJoinThreshold', '96m'),
('spark.shuffle.service.enabled', 'true'),
('spark.metrics.namespace',
'app_name:${spark.app.name}.app_id:${spark.app.id}'),
('spark.scheduler.mode', 'FAIR'),
('spark.yarn.historyServer.address', 'congenial-sturdy-bassoon-m:18080'),
('spark.sql.adaptive.enabled', 'true'),
('spark.yarn.jars', 'local:/usr/lib/spark/jars/*'),
('spark.scheduler.minRegisteredResourcesRatio', '0.0'),
('spark.hadoop.hive.execution.engine', 'mr'),
('spark.app.name', 'PySparkShell'),
('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version', '2'),
('spark.dynamicAllocation.maxExecutors', '10000'),
('spark.ui.proxyBase', '/proxy/application_1663842742689_0013'),
('spark.master', 'yarn'),
('spark.ui.port', '0'),
('spark.sql.catalogImplementation', 'hive'),
('spark.rpc.message.maxSize', '512'),
('spark.executorEnv.OPENBLAS_NUM_THREADS', '1'),
('spark.submit.pyFiles', ''),
('spark.yarn.isPython', 'true'),
('spark.dynamicAllocation.enabled', 'true'),
('spark.ui.showConsoleProgress', 'true'),
('spark.history.fs.logDirectory',
'gs://dataproc-temp-europe-west1-907569830041-jsgvqmyn/0255e376-31c9-4b52-8e63-a4fe6188eba3/spark-job-history'),
('spark.sql.cbo.enabled', 'true')]
I don't understand why the parameter spark.executor.memory
is set to 12859m
when I have 32g
PER WORKER, same goes for spark.executor.cores
, set to 4
when each of my worker has 8
cores.
Is it normal to use to few resources or should I set it when starting my sparkSession ? The code I use for now is:
spark = SparkSession \
.builder \
.appName('my_app') \
.getOrCreate()
I read something about yarn.nodemanager.resource.memory-mb
but I am not sure if it applies to PySpark clusters.
Thank you in advance for your help
Edit: To add more context, I am trying to read 10M+ Json files from Google Cloud Storage, and whatever I try I end up with OOM Error from the JVM, is there something I can set specifically to solve that kind of problem ?
Upvotes: 2
Views: 263
Reputation: 626
Ideally you can use upto 75 to 80 percentage of your resources in a single executor. Lets say you have an executor of 8 cores and 16GB RAM - you can use around 6 cores and 12GB RAM for spark (leaving remaining resource for other operations like OS, mem alloc etc for that VM or pod).
This doc has details around how to size executors for spark - https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html#:~:text=Leave%201%20core%20per%20node,)%20%3D%20150%2F5%20%3D%2030
You can use those params in your spark config - --num-executors, --executor-cores and --executor-memory
and you can play around with your spark job and see which config and infra suits your usecase.
Upvotes: 1