Naresh
Naresh

Reputation: 5407

How to dynamically choose spark.sql.shuffle.partitions

I am currently processing the data using spark and foreach partition open a connection to mysql and insert it to the database in a batch of 1000. As mentioned in the SparkDocumentation default value of spark.sql.shuffle.partitions is 200 but i want to keep it dynamic. So, how do i calculate it. Hence, neither choosing very high value causing performance degradation nor choosing very small value causing OOM.

Upvotes: 8

Views: 5565

Answers (2)

Ajay Ahuja
Ajay Ahuja

Reputation: 1323

Try below option -

val numExecutors         = spark.conf.get("spark.executor.instances").toInt

val numExecutorsCores    = spark.conf.get("spark.executor.cores").toInt

val numShufflePartitions = (numExecutors * numExecutorsCores)

spark.conf.set("spark.sql.shuffle.partitions", numShufflePartitions)

This will help you set the right number of shuffle partitions based on executor and executors cores used for your spark job without compromising performance and leading to Out Of Memory issues.

If you still get out of memeory them set below property -

spark.conf.set("spark.executor.memoryOverhead", "3G")

Other option is to calculate Dataframe size and didvie that by hdfs block size and use the resultant number to set spark.sql.shuffle.partitions.

Upvotes: -1

Raju Bairishetti
Raju Bairishetti

Reputation: 354

You can use df.repartition(numPartitions) method for doing this. You can take decision based on the input/intermediate output and pass numPartitions to repartition() method.

df.repartition(numPartitions)   or rdd.repartition(numPartitions)

Upvotes: -4

Related Questions