elgoog
elgoog

Reputation: 509

Apache Spark running out of memory with smaller amount of partitions

I have an Spark application that keeps running out of memory, the cluster has two nodes with around 30G of RAM, and the input data size is about few hundreds of GBs.

The application is a Spark SQL job, it reads data from HDFS and create a table and cache it, then do some Spark SQL queries and writes the result back to HDFS.

Initially I split the data into 64 partitions and I got OOM, then I was able to fix the memory issue by using 1024 partitions. But why using more partitions helped me solve the OOM issue?

Upvotes: 4

Views: 8373

Answers (3)

João Paulo
João Paulo

Reputation: 21

Rockie's answer is right, but he does't get the point of your question.

When you cache an RDD, all of his partitions are persisted (in term of storage level) - respecting spark.memory.fraction and spark.memory.storageFraction properties.

Besides that, in an certain moment Spark can automatically drop's out some partitions of memory (or you can do this manually for entire RDD with RDD.unpersist()), according with documentation.

Thus, as you have more partitions, Spark is storing fewer partitions in LRU so that they are not causing OOM (this may have negative impact too, like the need to re-cache partitions).

Another importante point is that when you write result back to HDFS using X partitions, then you have X tasks for all your data - take all the data size and divide by X, this is the memory for each task, that are executed on each (virtual) core. So, that's not difficult to see that X = 64 lead to OOM, but X = 1024 not.

Upvotes: 2

vaquar khan
vaquar khan

Reputation: 11479

Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data". The issue with large partitions generating OOM

Partitions determine the degree of parallelism. Apache Spark doc says that, the partitions size should be atleast equal to the number of cores in the cluster.

Less partitions results in

  • Less concurrency,
  • Increase memory pressure for transformation which involves shuffle
  • More susceptible for data skew.

Many partitions might also have negative impact

  • Too much time spent in scheduling multiple tasks

Storing your data on HDFS, it will be partitioned already in 64 MB or 128 MB blocks as per your HDFS configuration When reading HDFS files with spark, the number of DataFrame partitions df.rdd.getNumPartitions depends on following properties

  • spark.default.parallelism (Cores available for the application)
  • spark.sql.files.maxPartitionBytes (default 128MB)
  • spark.sql.files.openCostInBytes (default 4MB)

Links :

During Spark Summit Aaron Davidson gave some tips about partitions tuning. He also defined a reasonable number of partitions resumed to below 3 points:

  • Commonly between 100 and 10000 partitions (note: two below points are more reliable because the "commonly" depends here on the sizes of dataset and the cluster)
  • lower bound = at least 2*the number of cores in the cluster
  • upper bound = task must finish within 100 ms

Upvotes: 2

Rockie Yang
Rockie Yang

Reputation: 4925

The solution to big data is partition(divide and conquer). Since not all data could be fit into the memory, and it also could not be processed in a single machine.

Each partition could fit into memory and processed(map) in relative short time. After the data is processed for each partition. It need be merged (reduce). This is tradition map reduce

Splitting data to more partitions means that each partition getting smaller.

[Edit]

Spark using revolution concept called Resilient Distributed DataSet(RDD).

  • There are two types of operations, transformation and acton
  • Transformations are mapping from one RDD to another. It is lazy evaluated. Those RDD could be treated as intermediate result we don't wanna get.
  • Actions is used when you really want get the data. Those RDD/data could be treated as what we want it, like take top failing.
  • Spark will analysed all the operation and create a DAG(Directed Acyclic Graph) before execution.
  • Spark start compute from source RDD when actions are fired. Then forget it.

Spark DAG
(source: cloudera.com)

I made a small screencast for a presentation on Youtube Spark Makes Big Data Sparking.

Upvotes: 4

Related Questions