irbull
irbull

Reputation: 2530

Any tips for scaling Spark horizontally

Does anybody have any tips when moving Spark execution from a few large nodes to many, smaller nodes?

I am running a system with 4 executors, each executor has 24Gb of ram and 12 cores. If I try to scale that out to 12 executors, 4 cores each and 8 Gb of ram (Same total RAM, same total cores, just distributed differently) I run into out of memory errors: Container killed by YARN for exceeding memory limits. 8.8 GB of 8.8 GB physical memory used.

I have increased the number partitions by a factor of 3 to create more (yet smaller) partitions, but this didn't help.

Does anybody have any tips & tricks when trying to scale spark horizontally?

Upvotes: 0

Views: 3913

Answers (1)

Wade Jensen
Wade Jensen

Reputation: 692

This is a pretty broad question, executor sizing in Spark is a very complicated kind of black magic, and the rules of thumb which were correct in 2015 for example are obsolete now, as will whatever I say be obsolete in 6 months with the next release of Spark. A lot comes down to exactly what you are doing and avoiding key skew in your data.

This is a good place to start to learn and develop your own understanding: https://spark.apache.org/docs/latest/tuning.html

There are also a multitude of presentations on Slideshare about tuning Spark, try and read / watch the most recent ones. Anything older than 18 months be sceptical of, and anything older than 2 years just ignore.

I will make the assumption that you are using at least Spark 2.x.

The error you're encountering is indeed because of poor executor sizing. What is happening is that your executors are attempting to do too much at once, and running themselves into the ground as they run out of memory.

All other things being equal these are the current rules of thumb as I apply them:

The short version

  • 3 - 4 virtual (hyperthreaded) cores and 29GB of RAM is a reasonable default executor size (I will explain why later). If you know nothing else, partition your data well and use that.
  • You should normally aim for a data partition size (in memory) on the order of ~100MB to ~3GB

The formulae I apply

Executor memory = number of executor cores * partition size * 1.3 (safety factor)

Partition size = size on disk of data / number of partitions * deser ratio

The deserialisation ratio is the ratio between the size of the data on disk and the size of data in memory. The Java memory representation of the same data tends to be a decent bit larger than on disk.

You also need to account for whether your data is compressed, many common formats like Parquet and ORC use compression like gzip or snappy.

For snappy compressed text data (very easily compressed), I use ~10X - 100X. For snappy compressed data with a mix of text, floats, dates etc I see between 3X and 15X typically.

number of executor cores = 3 to 4 Executor cores totally depends on how compute vs memory intensive your calculation is. Experiment and see what is best for your use case. I have never seen anyone informed on Spark advocate more than 6 cores.

Spark is smart enough to take advantage of data locality, so the larger your executor, the better chance that your data is PROCESS_LOCAL More data locality is good, up to a point.

When a JVM gets too large > 50GB, it begins to operate outside what it was originally designed to do, and depending on your garbage collection algorithm, you may begin to see degraded performance and high GC time.

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

There also happens to be a performance trick in Java that if your JVM is smaller than 32GB, you can use 32 bit compressed pointers rather than 64 bit pointers, which saves space and reduces cache pressure.

https://docs.oracle.com/javase/7/docs/technotes/guides/vm/performance-enhancements-7.html https://blog.codecentric.de/en/2014/02/35gb-heap-less-32gb-java-jvm-memory-oddities/

It also so happens that YARN adds 7% or 384MB of RAM (whichever is larger) to your executor size for overhead / safety factor, which is where 29GB rule of thumb comes from: 29GB + 7% ~= 32GB

You mentioned that you are using 12 core, 24GB RAM executors. This sends up a red flags for me.

Why?

Because every "core" in an executor is assigned one "task" at time. A task is equivalent to the work required to compute the transformation of one partition from "stage" A to "stage" B.

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-taskscheduler-tasks.html https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-DAGScheduler-Stage.html

If your executor has 12 cores, then it is going to try and do 12 tasks simulatenously with a 24GB memory budget. 24GB / 12 cores = 2GB per core. If your partitions are greater than 2GB, you will get an out of memory error. If the particular transformation doubles the size of the input (even intermediately), then you need to account for that as well.

Upvotes: 6

Related Questions