Reputation: 5407
I am currently processing the data using spark and foreach partition open a connection to mysql and insert it to the database in a batch of 1000. As mentioned in the SparkDocumentation default value of spark.sql.shuffle.partitions
is 200 but i want to keep it dynamic. So, how do i calculate it. Hence, neither choosing very high value causing performance degradation nor choosing very small value causing OOM
.
Upvotes: 8
Views: 5565
Reputation: 1323
Try below option -
val numExecutors = spark.conf.get("spark.executor.instances").toInt
val numExecutorsCores = spark.conf.get("spark.executor.cores").toInt
val numShufflePartitions = (numExecutors * numExecutorsCores)
spark.conf.set("spark.sql.shuffle.partitions", numShufflePartitions)
This will help you set the right number of shuffle partitions based on executor and executors cores used for your spark job without compromising performance and leading to Out Of Memory issues.
If you still get out of memeory them set below property -
spark.conf.set("spark.executor.memoryOverhead", "3G")
Other option is to calculate Dataframe
size and didvie
that by hdfs
block size and use the resultant number to set spark.sql.shuffle.partitions
.
Upvotes: -1
Reputation: 354
You can use df.repartition(numPartitions) method for doing this. You can take decision based on the input/intermediate output and pass numPartitions to repartition() method.
df.repartition(numPartitions) or rdd.repartition(numPartitions)
Upvotes: -4