hawarden_
hawarden_

Reputation: 2170

Spark : remove duplicated rows with different values but keep only one row for distinctive row

I have a dataset ds like this:

ds.show():

id1 | id2 | id3 | value  |
1   | 1   | 2   | tom    |
1   | 1   | 2   | tim    |
1   | 3   | 2   | tom    |
1   | 3   | 2   | tom    |
2   | 1   | 2   | mary   |

I want to remove all duplicate rows (i.e. row 1 and row 2) for the given keys (id1,id2,id3), but at the same time only keep one row for duplicated rows with same value (i.e. row 3 and row 4). The expected output is:

id1 | id2 | id3 | value  |
1   | 3   | 2   | tom    |
2   | 1   | 2   | mary   |

here I should remove row 1 and row 2 because we have 2 values for the key group. But we keep only one row for row 3 and row 4 because the value is the same (instead of removing these two rows)

I try to achieve this using:

val df = Seq(
  (1, 1, 2, "tom"),
  (1, 1, 2, "tim"),
  (1, 3, 2, "tom"),
  (1, 3, 2, "tom"),
  (2, 1, 2, "mary")
).toDF("id1", "id2", "id3", "value")

val window = Window.partitionBy("id1", "id2", "id3")

df.withColumn("count", count("value").over(window))
  .filter($"count" < 2)
  .drop("count")
  .show(false)

Here is the related question: Spark: remove all duplicated lines

But it's not working as expected because it will remove all the duplicated rows.

The reason that I want to do this is to join with another dataset, and not adding information from this dataset when we have multiple names for a same key group

Upvotes: 1

Views: 581

Answers (2)

Lamanus
Lamanus

Reputation: 13581

You can distinct to get the only one row when it is duplicated.

df.distinct
  .withColumn("count", count("value").over(window))
  .filter($"count" < 2)
  .drop("count")
  .show(false)

+---+---+---+-----+
|id1|id2|id3|value|
+---+---+---+-----+
|1  |3  |2  |tom  |
|2  |1  |2  |mary |
+---+---+---+-----+

You can also use the groupBy method.

df.groupBy("id1", "id2", "id3", "value")
  .agg(first("col1").as("col1"), ...)
  .withColumn("count", count("value").over(window))
  .filter($"count" < 2)
  .drop("count")
  .show(false)

Upvotes: 0

koiralo
koiralo

Reputation: 23119

You can drop duplicates before grouping, which gives you single record as below

df.dropDuplicates()
  .withColumn("count", count("value").over(window))
  .filter($"count" < 2)
  .drop("count")
  .show(false)

You can also specify the fields to be checked for duplicate as

df.dropDuplicates("id1", "id2", "id3", "value")
  .withColumn("count", count("value").over(window))
  .filter($"count" < 2)
  .drop("count")
  .show(false)

Output:

+---+---+---+-----+
|id1|id2|id3|value|
+---+---+---+-----+
|1  |3  |2  |tom  |
|2  |1  |2  |mary |
+---+---+---+-----+

Upvotes: 2

Related Questions