Reputation: 1598
I have a spark data frame that has already been repartitioned by column x:
df2 = df1.reparition("x")
I would like to drop duplicates by x and another column without shuffling, since the shuffling is extremely long in this particular case.
df3 = df2.dropDuplicates(subset=["x","y"])
edit: Clearly the existing implementation of dropDuplicates does not support non-shuffling. Is there a way to achieve a similar results using sql windowing functions over y, assuming the data was recently partitioned by x.
Upvotes: 0
Views: 1965
Reputation: 9425
I think shuffle will happen but only to carry out repartition("x")
. The subsequent dropDuplicates()
will then sort partitions by key=["x","y"]
, followed by aggregate to take first
row for each key. Since all rows for x
are already in one partition, no additional shuffling will be required.
UPDATE
Lets run a quick test:
[user@gateway ~]# pyspark
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0-cdh6.x-SNAPSHOT
/_/
Using Python version 2.7.5 (default, Apr 9 2019 14:30:50)
SparkSession available as 'spark'.
>>> df1 = spark.createDataFrame([{'x': 1, 'y': 1, 'z': 1},{'x': 1, 'y': 1, 'z': 2},{'x': 1, 'y': 2, 'z': 2},{'x': 2, 'y': 1, 'z': 1}])
>>> df1.printSchema()
root
|-- x: long (nullable = true)
|-- y: long (nullable = true)
|-- z: long (nullable = true)
>>> df2 = df1.repartition("x")
>>> df3 = df2.dropDuplicates(subset=["x","y"])
>>> df3.explain()
== Physical Plan ==
*(1) HashAggregate(keys=[x#0L, y#1L], functions=[first(z#2L, false)])
+- *(1) HashAggregate(keys=[x#0L, y#1L], functions=[partial_first(z#2L, false)])
+- Exchange hashpartitioning(x#0L, 200)
+- Scan ExistingRDD[x#0L,y#1L,z#2L]
>>> df3.show()
+---+---+---+
| x| y| z|
+---+---+---+
| 1| 2| 2|
| 1| 1| 1|
| 2| 1| 1|
+---+---+---+
>>>
The plan shows single Exchange
operator (shuffle on x
), followed by HashAggregate
s operating on a given partition to get partial_first
for each x,y
pair and take the first row (without even sorting).
Upvotes: 4