Reputation: 2188
For example, I currently have a DataProc cluster consisting of a master and 4 workers, each machine has 8 vCPUs and 30GB memory.
Whenever I submit a job to the cluster, the cluster commits a max of 11GB total, and only engages 2 worker nodes to do the work, and on those nodes only uses 2 of the vCPU resources. This makes a job that should only take a few minutes take nearly an hour to execute.
I have tried editing the spark-defaults.conf
file on the master node, and have tried running my spark-submit
command with the arguments --executor-cores 4 --executor-memory 20g --num-executors 4
but neither has had any effect.
These clusters will only be spun up to perform a single task and will then be torn down, so the resources do not need to be held for any other jobs.
Upvotes: 4
Views: 1510
Reputation: 2188
I managed to resolve my issue by changing the scheduler to FIFO
instead of FAIR
, using the below at the end of my create
command:
--properties spark:spark.scheduler.mode=FIFO
Upvotes: 6
Reputation: 10677
You might want to see if what you're looking at is related to Dataproc set number of vcores per executor container - the number of vcores in-use reported by YARN is known to be incorrect, but it's only a cosmetic defect. On a Dataproc cluster with 8-core machines, the default configuration already does set 4 cores per executor; if you click through YARN to the Spark appmaster you should see that Spark is indeed able to pack 4 concurrent tasks per executor.
That part explains what might look like "only using 2 vCPU" per node.
The fact that the job only engages two of the worker nodes hints that there's more to it though; the amount of parallelism you get is related to how well the data is partitioned. If you have input files like gzip files that can't be split, then unfortunately there's not an easy way to increase input parallelism. However, at least in later pipeline stages or if you do have splittable files, you can increase parallelism be specifying the number of Spark partitions at read time or by calling repartition
in your code. Depending on your input size, you could also experiment with decreasing fs.gs.block.size
; that defaults to 134217728
(128MB) but you could set to half of that or a quarter of that or something either by setting it at cluster creation time:
--properties core:fs.gs.block.size=67108864
or at job submission time:
--properties spark.hadoop.fs.gs.block.size=67108864
Upvotes: 4