Umesh Kacha
Umesh Kacha

Reputation: 13686

What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

I am using Spark SQL actually hiveContext.sql() which uses group by queries and I am running into OOM issues. So thinking of increasing value of spark.sql.shuffle.partitions from 200 default to 1000 but it is not helping.

I believe this partition will share data shuffle load so more the partitions less data to hold. I am new to Spark. I am using Spark 1.4.0 and I have around 1TB of uncompressed data to process using hiveContext.sql() group by queries.

Upvotes: 40

Views: 48888

Answers (4)

pbahr
pbahr

Reputation: 1350

I came across this post from Cloudera about Hive Partitioning. Check out the "Pointers" section talking about number of partitions and number of files in each partition resulting in overloading the name node, which might cause OOM.

Upvotes: 0

Thomas Decaux
Thomas Decaux

Reputation: 22711

It's actually depends on your data and your query, if Spark must load 1Tb, there is something wrong on your design.

Use the superbe web UI to see the DAG, mean how Spark is translating your SQL query to jobs/stages and tasks.

Useful metrics are "Input" and "Shuffle".

  • Partition your data (Hive / directory layout like /year=X/month=X)
  • Use spark CLUSTER BY feature, to work per data partition
  • Use ORC / Parquet file format because they provide "Push-down filter", useless data is not loaded to Spark
  • Analyze Spark History to see how Spark is reading data

Also, OOM could happen on your driver?

-> this is another issue, the driver will collect at the end the data you want. If you ask too much data, the driver will OOM, try limiting your query, or write another table (Spark syntax CREATE TABLE ...AS).

Upvotes: 4

nont
nont

Reputation: 9529

If you're running out of memory on the shuffle, try setting spark.sql.shuffle.partitions to 2001.

Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000:

private[spark] object MapStatus {

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }
...

I really wish they would let you configure this independently.

By the way, I found this information in a Cloudera slide deck.

Upvotes: 57

samthebest
samthebest

Reputation: 31563

OK so I think your issue is more general. It's not specific to Spark SQL, it's a general problem with Spark where it ignores the number of partitions you tell it when the files are few. Spark seems to have the same number of partitions as the number of files on HDFS, unless you call repartition. So calling repartition ought to work, but has the caveat of causing a shuffle somewhat unnecessarily.

I raised this question a while ago and have still yet to get a good answer :(

Spark: increase number of partitions without causing a shuffle?

Upvotes: 9

Related Questions