Reputation: 2170
I have a dataset ds
like this:
ds.show():
id1 | id2 | id3 | value |
1 | 1 | 2 | tom |
1 | 1 | 2 | tim |
1 | 3 | 2 | tom |
1 | 3 | 2 | tom |
2 | 1 | 2 | mary |
I want to remove all duplicate rows (i.e. row 1 and row 2) for the given keys (id1,id2,id3)
, but at the same time only keep one row for duplicated rows with same value (i.e. row 3 and row 4). The expected output is:
id1 | id2 | id3 | value |
1 | 3 | 2 | tom |
2 | 1 | 2 | mary |
here I should remove row 1 and row 2 because we have 2 values for the key group. But we keep only one row for row 3 and row 4 because the value is the same (instead of removing these two rows)
I try to achieve this using:
val df = Seq(
(1, 1, 2, "tom"),
(1, 1, 2, "tim"),
(1, 3, 2, "tom"),
(1, 3, 2, "tom"),
(2, 1, 2, "mary")
).toDF("id1", "id2", "id3", "value")
val window = Window.partitionBy("id1", "id2", "id3")
df.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)
Here is the related question: Spark: remove all duplicated lines
But it's not working as expected because it will remove all the duplicated rows.
The reason that I want to do this is to join with another dataset, and not adding information from this dataset when we have multiple names for a same key group
Upvotes: 1
Views: 581
Reputation: 13581
You can distinct
to get the only one row when it is duplicated.
df.distinct
.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)
+---+---+---+-----+
|id1|id2|id3|value|
+---+---+---+-----+
|1 |3 |2 |tom |
|2 |1 |2 |mary |
+---+---+---+-----+
You can also use the groupBy
method.
df.groupBy("id1", "id2", "id3", "value")
.agg(first("col1").as("col1"), ...)
.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)
Upvotes: 0
Reputation: 23119
You can drop duplicates before grouping, which gives you single record as below
df.dropDuplicates()
.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)
You can also specify the fields to be checked for duplicate as
df.dropDuplicates("id1", "id2", "id3", "value")
.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)
Output:
+---+---+---+-----+
|id1|id2|id3|value|
+---+---+---+-----+
|1 |3 |2 |tom |
|2 |1 |2 |mary |
+---+---+---+-----+
Upvotes: 2