The Phoenician
The Phoenician

Reputation: 73

Is it still necessary to fine tune spark config parameters when using Google CloudDataproc?

To elaborate:

Upvotes: 1

Views: 1601

Answers (2)

howie
howie

Reputation: 2705

The answer is yes. It dependent on your spark app's behavior, how many vm you run and what type of vm you use. The following is my example tunning parameter.

default_parallelism=512

PROPERTIES="\
spark:spark.executor.cores=2,\
spark:spark.executor.memory=8g,\
spark:spark.executor.memoryOverhead=2g,\
spark:spark.driver.memory=6g,\
spark:spark.driver.maxResultSize=6g,\
spark:spark.kryoserializer.buffer=128m,\
spark:spark.kryoserializer.buffer.max=1024m,\
spark:spark.serializer=org.apache.spark.serializer.KryoSerializer,\
spark:spark.default.parallelism=${default_parallelism},\
spark:spark.rdd.compress=true,\
spark:spark.network.timeout=3600s,\
spark:spark.rpc.message.maxSize=256,\
spark:spark.io.compression.codec=snappy,\
spark:spark.shuffle.service.enabled=true,\
spark:spark.sql.shuffle.partitions=256,\
spark:spark.sql.files.ignoreCorruptFiles=true,\
yarn:yarn.nodemanager.resource.cpu-vcores=8,\
yarn:yarn.scheduler.minimum-allocation-vcores=2,\
yarn:yarn.scheduler.maximum-allocation-vcores=4,\
yarn:yarn.nodemanager.vmem-check-enabled=false,\
capacity-scheduler:yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
  "

gcloud dataproc clusters create ${GCS_CLUSTER} \
       --scopes cloud-platform \
       --image pyspark-with-conda-v2-365 \
       --bucket  spark-data \
       --zone  asia-east1-b  \
       --master-boot-disk-size  500GB \
       --master-machine-type n1-highmem-2 \
       --num-masters  1 \ 
        --num-workers  2 \
       --worker-machine-type n1-standard-8 \
       --num-preemptible-workers 2 \
       --preemptible-worker-boot-disk-size 500GB \
       --properties ${PROPERTIES} 

Upvotes: 1

Dennis Huo
Dennis Huo

Reputation: 10707

Dataproc indeed provides smart defaults based on machine types (even custom machine types) and cluster shape that are intended to be a best "one-size-fits-all" setting that balances between efficiency of more threads per JVM with limitations on shared resource pools per JVM; roughly, machines are carved out to fit 2 executors per machine, and each executor is given half a machine's worth of threads (so you'd expect 2 executors each capable of running 4 tasks in parallel on an n1-standard-8, for example).

Keep in mind YARN is known to incorrectly report vcores for multi-threaded Spark executors so you may see only two YARN "vcores" occupied when running a big Spark job on Dataproc, but you can verify that all cores are indeed being used by looking at Spark AppMaster pages, running ps on a worker, or looking at CPU usage on the Dataproc cloud console page.

However, these types of settings are never universally 100% "optimal" and Dataproc doesn't yet automatically predict settings based on the actual workload or historical workloads that you've run. So, any settings purely based on cluster shape can't be 100% optimal for all workloads that run on that cluster.

Long story short, on Dataproc you shouldn't have to worry about explicit optimization under most circumstances unless you're trying to really squeeze out every ounce of efficiency, but at the same time you can always feel free to override Dataproc's settings with your own properties either at cluster-creation or job-submission time if desired. A few points to consider:

  • If you have a more cpu-heavy vs memory workloads, consider using "highcpu" machine types and Dataproc will automatically have each executor allocate less memory per CPU.
  • If you have a more memory-heavy workload, consider highmem types
  • If your inputs aren't splittable and/or are highly compressed (like .csv.gz files) you could be more likely to run into memory issues since usual parallelism computations won't know whether the input data will blow up to be larger than expected. In these cases you may need to override executor memory to be larger
  • If you're using subprocesses or native libraries like calling ffmpeg from worker tasks, then tasks will consume physical memory outside of YARN/Spark's knowledge; you may need to adjust memory limits in these cases too, either by reducing cores per executor or cranking up executor memory overhead.
  • If you have something that's very IO bound or blocks on other async functions (like calling a slow external web endpoint from your tasks) then you might benefit from cranking up cores per-executor; then Spark runs more tasks than there are CPUs, but if the tasks are just waiting on IO, this will be good for improved efficiency.

Upvotes: 8

Related Questions