Reputation: 441
A dataframe has 3 million records. i am trying to move only duplicate records to seperate dataframe. I am using spark 1.6 with scala Data
IM,A-15ACWSSC,CP
IM,A-15ACWSSC,CP
IM,AK11-130BA,13MM BLK RUBBER CAB
New DataFrame
IM,A-15ACWSSC,CP
IM,A-15ACWSSC,CP
Code i have used
var df = Seq(
("IM", "A-15ACWSSC", "ASSY 1.5V2", "CP"),
("IM", "A-15ACWSSC", "ASSY 1.5V2", "CP"),
("IN", "A-15ACWSSC", "ASSY 1.6V2", "CP1"),
("IN", "A-15ACWSSC", "ASSY 1.7V2", "CP2")
).toDF("COL1", "COL2", "COL3", "COL4")
df.show()
// +----+----------+----------+----+
// |COL1| COL2| COL3|COL4|
// +----+----------+----------+----+
// | IM|A-15ACWSSC|ASSY 1.5V2| CP|
// | IM|A-15ACWSSC|ASSY 1.5V2| CP|
// | IN|A-15ACWSSC|ASSY 1.6V2| CP1|
// | IN|A-15ACWSSC|ASSY 1.7V2| CP2|
// +----+----------+----------+----+
df.registerTempTable("CLEANFRAME")
val CleanData = sqlContext.sql(
"""select COL1,COL2,COL3,COL4
from
(SELECT COL1,COL2,COL3,COL4, count(1) over (partition by COL1,COL2,COL3,COL4) as Uniqueid
FROM CLEANFRAME)
where Uniqueid > 1
""").cache()
CleanData.show
But it is not giving any result. Please help if i am missing anything.
Upvotes: 1
Views: 226
Reputation: 1157
Your should be modified as below. Every columns should be included in group by.
Edit : Used windowing and duplicates records are preserved.
var df = Seq(
("IM","A-15ACWSSC","ASSY 1.5V2","CP"),
("IM","A-15ACWSSC","ASSY 1.5V2","CP"),
("IN","A-15ACWSSC","ASSY 1.6V2","CP1"),
("IN","A-15ACWSSC","ASSY 1.7V2","CP2")
).toDF("COL1", "COL2","COL3","COL4")
df.show()
// +----+----------+----------+----+
// |COL1| COL2| COL3|COL4|
// +----+----------+----------+----+
// | IM|A-15ACWSSC|ASSY 1.5V2| CP|
// | IM|A-15ACWSSC|ASSY 1.5V2| CP|
// | IN|A-15ACWSSC|ASSY 1.6V2| CP1|
// | IN|A-15ACWSSC|ASSY 1.7V2| CP2|
// +----+----------+----------+----+
df.createOrReplaceTempView("CLEANFRAME")
val CleanData= sqlContext.sql("""select COL1,COL2,COL3,COL4
from
(SELECT COL1,COL2,COL3,COL4, count(1) over (partition by COL1,COL2,COL3,COL4) as Uniqueid
FROM CLEANFRAME)
where Uniqueid > 1
""" ).cache()
Error:
Exception in thread "main" java.lang.RuntimeException: [3.79] failure: ``)'' expected but `(' found
(SELECT COL1,COL2,COL3,COL4, count(1) over (partition by COL1,COL2,COL3,COL4) as Uniqueid
Upvotes: 1
Reputation: 1892
you can try this
scala> import org.apache.spark.sql.expressions.Window
scala> import org.apache.spark.sql.functions._
scala> var win = Window.partitionBy("a","b","c","d").orderBy("a")
scala> var dff = Seq(("IM","A-15ACWSSC","ASSY 1.5V2","CP"), ("IM","A-15ACWSSC","ASSY 1.5V2","CP"), ("IM","AK11-130BA","13MM BLK RUBBER CAB FOOT","ap")).toDF("a","b","c","d")
scala> dff.show
+---+----------+--------------------+---+
| a| b| c| d|
+---+----------+--------------------+---+
| IM|A-15ACWSSC| ASSY 1.5V2| CP|
| IM|A-15ACWSSC| ASSY 1.5V2| CP|
| IM|AK11-130BA|13MM BLK RUBBER C...| ap|
+---+----------+--------------------+---+
for finding duplicates and based on that filter whose value is >= 2
scala> var dff_dup = dff.withColumn("test",count("*").over(win)).filter($"test">=2)
scala> dff_dup.show
+---+----------+----------+---+----+
| a| b| c| d|test|
+---+----------+----------+---+----+
| IM|A-15ACWSSC|ASSY 1.5V2| CP| 2|
| IM|A-15ACWSSC|ASSY 1.5V2| CP| 2|
+---+----------+----------+---+----+
Upvotes: 0