Reputation: 5892
Give following code
case class Contact(name: String, phone: String)
case class Person(name: String, ts:Long, contacts: Seq[Contact])
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._
val people = sqlContext.read.format("orc").load("people")
What is the best way to dedupe users by its timestamp So the user with max ts will stay at collection? In spark using RDD I would run something like this
rdd.reduceByKey(_ maxTS _)
and would add the maxTS method to Person or add implicits ...
def maxTS(that: Person):Person =
that.ts > ts match {
case true => that
case false => this
}
Is it possible to do the same at DataFrames? and will that be the similar performance? We are using spark 1.6
Upvotes: 1
Views: 2528
Reputation: 46
You can do a groupBy and use your preferred aggregation method like sum, max etc.
df.groupBy($"name").agg(sum($"tx").alias("maxTS"))
Upvotes: 0
Reputation: 16076
You can use Window functions, I'm assuming that the key is name
:
import org.apache.spark.sql.functions.{rowNumber, max, broadcast}
import org.apache.spark.sql.expressions.Window
val df = // convert to DataFrame
val win = Window.partitionBy('name).orderBy('ts.desc)
df.withColumn("personRank", rowNumber.over(win))
.where('personRank === 1).drop("personRank")
For each person it will create personRank - each person with given name will have unique number, person with the latest ts will have the lowest rank, equal to 1. The you drop temporary rank
Upvotes: 2