salmanbw
salmanbw

Reputation: 1311

Remove all records which are duplicate in spark dataframe

I have a spark dataframe with multiple columns in it. I want to find out and remove rows which have duplicated values in a column (the other columns can be different).

I tried using dropDuplicates(col_name) but it will only drop duplicate entries but still keep one record in the dataframe. What I need is to remove all entries which were initially containing duplicate entries.

I am using Spark 1.6 and Scala 2.10.

Upvotes: 6

Views: 15451

Answers (4)

AnuragPandey
AnuragPandey

Reputation: 1

You are Using spark data frame. Here is an alternate approach. select only those rows where your column, col_name, has only one count.

df.createOrReplaceTempView("df")

df= spark.sql(
    """select * from df 
       where df.col_name in (select col_name from df
                            group by col_name
                            having count(col_name)== 1)"""
)

Upvotes: 0

Powers
Powers

Reputation: 19338

I added a killDuplicates() method to the open source spark-daria library that uses @Raphael Roth's solution. Here's how to use the code:

import com.github.mrpowers.spark.daria.sql.DataFrameExt._

df.killDuplicates(col("id"))

// you can also supply multiple Column arguments
df.killDuplicates(col("id"), col("another_column"))

Here's the code implementation:

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def killDuplicates(cols: Column*): DataFrame = {
      df
        .withColumn(
          "my_super_secret_count",
          count("*").over(Window.partitionBy(cols: _*))
        )
        .where(col("my_super_secret_count") === 1)
        .drop(col("my_super_secret_count"))
    }

  }

}

You might want to leverage the spark-daria library to keep this logic out of your codebase.

Upvotes: 2

Raphael Roth
Raphael Roth

Reputation: 27383

I would use window-functions for this. Lets say you want to remove duplicate id rows :

import org.apache.spark.sql.expressions.Window

df
  .withColumn("cnt", count("*").over(Window.partitionBy($"id")))
  .where($"cnt"===1).drop($"cnt")
  .show()

Upvotes: 11

Shaido
Shaido

Reputation: 28392

This can be done by grouping by the column (or columns) to look for duplicates in and then aggregate and filter the results.

Example dataframe df:

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  4|  5|
+---+---+

Grouping by the id column to remove its duplicates (the last two rows):

val df2 = df.groupBy("id")
  .agg(first($"num").as("num"), count($"id").as("count"))
  .filter($"count" === 1)
  .select("id", "num")

This will give you:

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
+---+---+

Alternativly, it can be done by using a join. It will be slower, but if there is a lot of columns there is no need to use first($"num").as("num") for each one to keep them.

val df2 = df.groupBy("id").agg(count($"id").as("count")).filter($"count" === 1).select("id")
val df3 = df.join(df2, Seq("id"), "inner")

Upvotes: 2

Related Questions