K.Tom
K.Tom

Reputation: 185

How to set dynamic spark.sql.shuffle.partitions in pyspark?

I want to set Spark (V 2.3) configuration spark.sql.shuffle.partitions dynamically and this configuration used in multiple spark applications.

code:

Spark configuration
===================
spark = SparkConf() \
 .setAppName("My App") \ 
 .set('spark.executor.memory', '7g') \
 .set('spark.num.executors', '15') \
 .set('spark.executor.cores', '4') \
 .set('spark.yarn.executor.memoryOverhead', '3098m') \
 .set('spark.sql.shuffle.partitions', '1500') \
 .set('fs.s3.multipart.uploads.enabled', 'true')

 empsql = 'Select * From Employee'  #Only 30 records and 40 columns
 df = spark.sql(empsql) ##Spark is configured
 df.coalesce(2).write.mode('overwrite').format("parquet").option("delimiter",'|').save(s3_path, header = True) #coalesce cannot be changed to repartition due to restrictions 
**Error:** Spark Out of memeory issues
**Resolved:** By changing the above spark configuration to .set('spark.sql.shuffle.partitions', '2')

For the above data frame, it is resolved by changing to .set('spark.sql.shuffle.partitions', '2') and the issue is, it doesn't work for spark applications having over one or more million records and it requires .set('spark.sql.shuffle.partitions', '1500').

How to resolve this issue and how to make it dynamic?

Upvotes: 0

Views: 9134

Answers (1)

Som
Som

Reputation: 6323

Initial answer from comments -

Actually setting 'spark.sql.shuffle.partitions', 'num_partitions' is a dynamic way to change the shuffle partitions default setting. Here the task is to choose best possible num_partitions. approaches to choose the best numPartitions can be -

  1. based on the cluster resources
  2. based on the data size on which you want to apply this property

Update-1

Que from Author- by adding .set('spark.sql.shuffle.partitions',num_partitions) will it calculate the number of partitions dynamically based on the data size like taking up to 1500 partitions?

@K.Tom, This will just set(override the default=200) the property value spark.sql.shuffle.partitions to num_partitions. The same will be used whenever any shuffling action happened (Exchange in spark plan). Please note that, if you cluster has not enough resource, this operation may fail. Also, if your cluster has resources to accommodate the num_partitions, but dataset is not huge then you may get most of the partitions empty which is again overhead to maintain(scheduling, processing and maintaining the metadata of all these partitions). Thus to conclude setting property spark.sql.shuffle.partitions is mixture of art and science.

Upvotes: 2

Related Questions