WicleQian
WicleQian

Reputation: 63

scala spark how to get latest day's record

data=
"""
user date      item1 item2
1    2015-12-01 14  5.6
1    2015-12-01 10  0.6
1    2015-12-02 8   9.4
1    2015-12-02 90  1.3
2    2015-12-01 30  0.3
2    2015-12-01 89  1.2
2    2015-12-30 70  1.9
2    2015-12-31 20  2.5
3    2015-12-01 19  9.3
3    2015-12-01 40  2.3
3    2015-12-02 13  1.4
3    2015-12-02 50  1.0
3    2015-12-02 19  7.8
"""

if I have some data like above, how can I get each user's latest day's record? I tried to use the groupByKey, but have no idea.

val user = data.map{
case(user,date,item1,item2)=>((user,date),Array(item1,item2))
}.groupByKey()

and then I don't know how to deal with it. Can anyone give me some suggestions? Thanks a lot:)

update:

I changed my data,and now user has several records in the latest day, and I want to get all of them. Thx:)

second update:

I want to get the result is:

user1 (2015-12-02,Array(8,9.4),Array(90,1.3))
user2 (2015-12-31,Array(20,2.5))
user3 (2015-12-02,Array(13,1.4),Array(50,1.0),Array(19,7,8))

and now I write some code:

val data2=data.trim.split("\\n").map(_split("\\s+")).map{
f=>{(f(0),ArrayBuffer(
                    f(1),
                    f(2).toInt,
                    f(3).toDouble)
    )}
}
val data3 = sc.parallelize(data2)
data3.reduceByKey((x,y)=>
             if(x(0).toString.compareTo(y(0).toString)>=0) x++=y
                  else y).foreach(println)

result is:

(2,ArrayBuffer(2015-12-31, 20, 2.5))
(1,ArrayBuffer(2015-12-02, 8, 9.4, 2015-12-02, 90, 1.3))
(3,ArrayBuffer(2015-12-02, 13, 1.4, 2015-12-02, 50, 1.0, 2015-12-02, 19, 7.8))

Is there anything can do to improve it? :)

Upvotes: 4

Views: 4833

Answers (5)

loneStar
loneStar

Reputation: 4010

The question goes in traditional windowing fictions concept. The answer to your question is partition by user and order by date using the rank function. If you rank all the records in the same day gets the same rank and then you can simply filter out the latest record with rank = 1 filter.

 val data = sc.textFile("/user/hadoop/data.txt");


    val df=data.map(_.split("\\s+")).map{f=>{(f(0),f(1),f(2).toInt,f(3).toDouble)}}.toDF();

    import org.apache.spark.sql.expressions.Window

    import org.apache.spark.sql.functions._

    val w = Window.partitionBy("_1").orderBy("_2");




    df.withColumn("Rank",rank().over(w)).show()


+---+----------+---+---+----+
| _1|        _2| _3| _4|Rank|
+---+----------+---+---+----+
|  3|2015-12-01| 19|9.3|   1|
|  3|2015-12-01| 40|2.3|   1|
|  3|2015-12-02| 13|1.4|   3|
|  3|2015-12-02| 50|1.0|   3|
|  3|2015-12-02| 19|7.8|   3|
|  1|2015-12-01| 14|5.6|   1|
|  1|2015-12-01| 10|0.6|   1|
|  1|2015-12-02|  8|9.4|   3|
|  1|2015-12-02| 90|1.3|   3|
|  2|2015-12-01| 30|0.3|   1|
|  2|2015-12-01| 89|1.2|   1|
|  2|2015-12-30| 70|1.9|   3|
|  2|2015-12-31| 20|2.5|   4|
+---+----------+---+---+----+

Now you can filter the rank =1 record.

Upvotes: 1

David Maust
David Maust

Reputation: 8270

I think your best bet is to map your input data to an RDD of tuples of (user, (date, item1, item2)) so the rdd will be userRdd: RDD[(Int, (Date, Int, Double))]

From here you can create a reducer that will take two tuples and produce another of the same format which is the tuple with the greater date value:

reduceMaxDate(a: (Date, Int, Double), b: (Date, Int, Double)) : (Date, Int, Double) = {
     if(a._1 > b._1) a else b
} 

From here you can find the max value for each user by calling:

userRdd.reduceByKey(reduceMaxDate).

This will yield the tuple with the max timestamp for each user.

Upvotes: 5

Aravind Yarram
Aravind Yarram

Reputation: 80194

Here is my solution in following 4 steps. Copy/paste this in the shell to see output at each step

//Step 1. Prepare data

val input="""user date      item1 item2
1    2015-12-01 14  5.6
1    2015-12-01 10  0.6
1    2015-12-02 8   9.4
1    2015-12-02 90  1.3
2    2015-12-01 30  0.3
2    2015-12-01 89  1.2
2    2015-12-30 70  1.9
2    2015-12-31 20  2.5
3    2015-12-01 19  9.3
3    2015-12-01 40  2.3
3    2015-12-02 13  1.4
3    2015-12-02 50  1.0
3    2015-12-02 19  7.8
"""
val inputLines=sc.parallelize(input.split("\\r?\\n"))
//filter the header row
val data=inputLines.filter(l=> !l.startsWith("user") )
data.foreach(println)

//Step 2. Find the latest date of each user

val keyByUser=data.map(line => { val a = line.split("\\s+"); ( a(0), line ) })
//For each user, find his latest date
val latestByUser = keyByUser.reduceByKey( (x,y) => if(x.split("\\s+")(1) > y.split("\\s+")(1)) x else y )
latestByUser.foreach(println)

//Step 3. Join the original data with the latest date to get the result

val latestKeyedByUserAndDate = latestByUser.map( x => (x._1 + ":"+x._2.split("\\s+")(1), x._2))
val originalKeyedByUserAndDate = data.map(line => { val a = line.split("\\s+"); ( a(0) +":"+a(1), line ) })
val result=latestKeyedByUserAndDate.join(originalKeyedByUserAndDate)
result.foreach(println)

//Step 4. Transform the result into the format you desire

def createCombiner(v:(String,String)):List[(String,String)] = List[(String,String)](v)
def mergeValue(acc:List[(String,String)], value:(String,String)) : List[(String,String)] = value :: acc
def mergeCombiners(acc1:List[(String,String)], acc2:List[(String,String)]) : List[(String,String)] = acc2 ::: acc1
//use combineByKey
val transformedResult=result.mapValues(l=> { val a=l._2.split(" +"); (a(2),a(3)) } ).combineByKey(createCombiner,mergeValue,mergeCombiners)
transformedResult.foreach(println)
  1. Prepare data
  2. Find the latest date of each user
  3. Join the original data with the latest date to get the result
  4. Transform the result into the format you desire

Upvotes: 1

Durga Viswanath Gadiraju
Durga Viswanath Gadiraju

Reputation: 3956

Here are the scripts

For scala

val data = sc.textFile("file:///home/cloudera/data.txt")
val dataMap = data.map(x => (x.split(" +")(0), x))
val dataReduce = dataMap.reduceByKey((x, y) =>
  if(x.split(" +")(1) >= y.split(" +")(1)) x 
  else y)

val dataUserAndDateKey = data.map(rec => ((rec.split(" +")(0), rec.split(" +")(1)), rec))

val dataReduceUserAndDateKey = dataReduce.map(rec => ((rec._2.split(" +")(0), rec._2.split(" +")(1)), rec(1)))

val joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey)

joinData.map(rec => rec._2._1).foreach(println)

For pyspark

import re

data = sc.textFile("file:///home/cloudera/data.txt")
dataMap = data.map(lambda rec: (re.split('\s+', rec)[0], rec))
dataReduce = dataMap.reduceByKey(lambda x, y: x if(re.split('\s+', x)[1] >= re.split('\s+', y)[1]) else y)

dataUserAndDateKey = data.map(lambda rec: ((re.split('\s+', rec)[0], re.split('\s+', rec)[1]), rec))

dataReduceUserAndDateKey = dataReduce.map(lambda rec: ((re.split('\s+', rec[1])[0], re.split('\s+', rec[1])[1]), rec[1]))

joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey)
for i in joinData.collect(): print(i[1][0])

Here is the output:

3    2015-12-02 13  1.4
3    2015-12-02 50  1.0
3    2015-12-02 19  7.8
2    2015-12-31 20  2.5
1    2015-12-02 8   9.4
1    2015-12-02 90  1.3

You can also use SQL in HiveContext of SparkContext using data frames.

Upvotes: 2

Chris Fregly
Chris Fregly

Reputation: 1530

Assuming this data set is larger, you'll likely want to partition by Date if your data retrieval pattern is keyed by Date.

This will avoid a full scan/shuffle across all your data upon Read - instead, keeping rows in the correct partition upon Write.

Upvotes: 0

Related Questions