Reputation: 1428
I have a large dataframe which I created with 800 partitions.
df.rdd.getNumPartitions()
800
When I use dropDuplicates on the dataframe, it changes the partitions to default 200
df = df.dropDuplicates()
df.rdd.getNumPartitions()
200
This behaviour causes problem for me, as it will lead to out of memory.
Do you have any suggestion on fixing this problem? I tried setting spark.sql.shuffle.partition to 800 but it doesn't work. Thanks
Upvotes: 6
Views: 10925
Reputation: 330353
This happens because dropDuplicates
requires a shuffle. If you want to get a specific number of partitions you should set spark.sql.shuffle.partitions
(its default value is 200)
df = sc.parallelize([("a", 1)]).toDF()
df.rdd.getNumPartitions()
## 8
df.dropDuplicates().rdd.getNumPartitions()
## 200
sqlContext.setConf("spark.sql.shuffle.partitions", "800")
df.dropDuplicates().rdd.getNumPartitions()
## 800
An alternative approach (Spark 1.6+) is to repartition first:
df.repartition(801, *df.columns).dropDuplicates().rdd.getNumPartitions()
## 801
It is slightly more flexible but less efficient because doesn't perform local aggregation.
Upvotes: 10
Reputation: 1428
I found the solution at Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame
Use reduceByKey instead of dropDuplicates. reduceByKey also have an option of specifying the number of partitions for the final rdd.
The downside of using reduceByKey in this case is it is slow.
Upvotes: 0