Michael
Michael

Reputation: 1428

Using dropDuplicates in dataframe causes changes in the partition number

I have a large dataframe which I created with 800 partitions.

df.rdd.getNumPartitions()
800

When I use dropDuplicates on the dataframe, it changes the partitions to default 200

df = df.dropDuplicates()
df.rdd.getNumPartitions()
200

This behaviour causes problem for me, as it will lead to out of memory.

Do you have any suggestion on fixing this problem? I tried setting spark.sql.shuffle.partition to 800 but it doesn't work. Thanks

Upvotes: 6

Views: 10925

Answers (2)

zero323
zero323

Reputation: 330353

This happens because dropDuplicates requires a shuffle. If you want to get a specific number of partitions you should set spark.sql.shuffle.partitions (its default value is 200)

df = sc.parallelize([("a", 1)]).toDF()
df.rdd.getNumPartitions()
## 8

df.dropDuplicates().rdd.getNumPartitions()
## 200

sqlContext.setConf("spark.sql.shuffle.partitions", "800")

df.dropDuplicates().rdd.getNumPartitions()
## 800

An alternative approach (Spark 1.6+) is to repartition first:

df.repartition(801, *df.columns).dropDuplicates().rdd.getNumPartitions()
## 801

It is slightly more flexible but less efficient because doesn't perform local aggregation.

Upvotes: 10

Michael
Michael

Reputation: 1428

I found the solution at Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

Use reduceByKey instead of dropDuplicates. reduceByKey also have an option of specifying the number of partitions for the final rdd.

The downside of using reduceByKey in this case is it is slow.

Upvotes: 0

Related Questions