Julias
Julias

Reputation: 5892

What is the similar alternative to reduceByKey in DataFrames

Give following code

case class Contact(name: String, phone: String)
case class Person(name: String, ts:Long, contacts: Seq[Contact])

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._
val people = sqlContext.read.format("orc").load("people")

What is the best way to dedupe users by its timestamp So the user with max ts will stay at collection? In spark using RDD I would run something like this

rdd.reduceByKey(_ maxTS _) 

and would add the maxTS method to Person or add implicits ...

def maxTS(that: Person):Person =
that.ts > ts match {
  case true => that
  case false => this
}

Is it possible to do the same at DataFrames? and will that be the similar performance? We are using spark 1.6

Upvotes: 1

Views: 2528

Answers (2)

Swadhin Shahriar
Swadhin Shahriar

Reputation: 46

You can do a groupBy and use your preferred aggregation method like sum, max etc.

df.groupBy($"name").agg(sum($"tx").alias("maxTS"))

Upvotes: 0

T. Gawęda
T. Gawęda

Reputation: 16076

You can use Window functions, I'm assuming that the key is name:

import org.apache.spark.sql.functions.{rowNumber, max, broadcast}
import org.apache.spark.sql.expressions.Window
val df = // convert to DataFrame
val win = Window.partitionBy('name).orderBy('ts.desc)
df.withColumn("personRank", rowNumber.over(win))
  .where('personRank === 1).drop("personRank")

For each person it will create personRank - each person with given name will have unique number, person with the latest ts will have the lowest rank, equal to 1. The you drop temporary rank

Upvotes: 2

Related Questions