Reputation: 63
data=
"""
user date item1 item2
1 2015-12-01 14 5.6
1 2015-12-01 10 0.6
1 2015-12-02 8 9.4
1 2015-12-02 90 1.3
2 2015-12-01 30 0.3
2 2015-12-01 89 1.2
2 2015-12-30 70 1.9
2 2015-12-31 20 2.5
3 2015-12-01 19 9.3
3 2015-12-01 40 2.3
3 2015-12-02 13 1.4
3 2015-12-02 50 1.0
3 2015-12-02 19 7.8
"""
if I have some data like above, how can I get each user's latest day's record? I tried to use the groupByKey, but have no idea.
val user = data.map{
case(user,date,item1,item2)=>((user,date),Array(item1,item2))
}.groupByKey()
and then I don't know how to deal with it. Can anyone give me some suggestions? Thanks a lot:)
I changed my data,and now user has several records in the latest day, and I want to get all of them. Thx:)
I want to get the result is:
user1 (2015-12-02,Array(8,9.4),Array(90,1.3))
user2 (2015-12-31,Array(20,2.5))
user3 (2015-12-02,Array(13,1.4),Array(50,1.0),Array(19,7,8))
and now I write some code:
val data2=data.trim.split("\\n").map(_split("\\s+")).map{
f=>{(f(0),ArrayBuffer(
f(1),
f(2).toInt,
f(3).toDouble)
)}
}
val data3 = sc.parallelize(data2)
data3.reduceByKey((x,y)=>
if(x(0).toString.compareTo(y(0).toString)>=0) x++=y
else y).foreach(println)
result is:
(2,ArrayBuffer(2015-12-31, 20, 2.5))
(1,ArrayBuffer(2015-12-02, 8, 9.4, 2015-12-02, 90, 1.3))
(3,ArrayBuffer(2015-12-02, 13, 1.4, 2015-12-02, 50, 1.0, 2015-12-02, 19, 7.8))
Is there anything can do to improve it? :)
Upvotes: 4
Views: 4833
Reputation: 4010
The question goes in traditional windowing fictions concept. The answer to your question is partition by user and order by date using the rank function. If you rank all the records in the same day gets the same rank and then you can simply filter out the latest record with rank = 1 filter.
val data = sc.textFile("/user/hadoop/data.txt");
val df=data.map(_.split("\\s+")).map{f=>{(f(0),f(1),f(2).toInt,f(3).toDouble)}}.toDF();
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val w = Window.partitionBy("_1").orderBy("_2");
df.withColumn("Rank",rank().over(w)).show()
+---+----------+---+---+----+
| _1| _2| _3| _4|Rank|
+---+----------+---+---+----+
| 3|2015-12-01| 19|9.3| 1|
| 3|2015-12-01| 40|2.3| 1|
| 3|2015-12-02| 13|1.4| 3|
| 3|2015-12-02| 50|1.0| 3|
| 3|2015-12-02| 19|7.8| 3|
| 1|2015-12-01| 14|5.6| 1|
| 1|2015-12-01| 10|0.6| 1|
| 1|2015-12-02| 8|9.4| 3|
| 1|2015-12-02| 90|1.3| 3|
| 2|2015-12-01| 30|0.3| 1|
| 2|2015-12-01| 89|1.2| 1|
| 2|2015-12-30| 70|1.9| 3|
| 2|2015-12-31| 20|2.5| 4|
+---+----------+---+---+----+
Now you can filter the rank =1 record.
Upvotes: 1
Reputation: 8270
I think your best bet is to map your input data to an RDD of tuples of (user, (date, item1, item2))
so the rdd will be userRdd: RDD[(Int, (Date, Int, Double))]
From here you can create a reducer that will take two tuples and produce another of the same format which is the tuple with the greater date value:
reduceMaxDate(a: (Date, Int, Double), b: (Date, Int, Double)) : (Date, Int, Double) = {
if(a._1 > b._1) a else b
}
From here you can find the max value for each user by calling:
userRdd.reduceByKey(reduceMaxDate).
This will yield the tuple with the max timestamp for each user.
Upvotes: 5
Reputation: 80194
Here is my solution in following 4 steps. Copy/paste this in the shell to see output at each step
//Step 1. Prepare data
val input="""user date item1 item2
1 2015-12-01 14 5.6
1 2015-12-01 10 0.6
1 2015-12-02 8 9.4
1 2015-12-02 90 1.3
2 2015-12-01 30 0.3
2 2015-12-01 89 1.2
2 2015-12-30 70 1.9
2 2015-12-31 20 2.5
3 2015-12-01 19 9.3
3 2015-12-01 40 2.3
3 2015-12-02 13 1.4
3 2015-12-02 50 1.0
3 2015-12-02 19 7.8
"""
val inputLines=sc.parallelize(input.split("\\r?\\n"))
//filter the header row
val data=inputLines.filter(l=> !l.startsWith("user") )
data.foreach(println)
//Step 2. Find the latest date of each user
val keyByUser=data.map(line => { val a = line.split("\\s+"); ( a(0), line ) })
//For each user, find his latest date
val latestByUser = keyByUser.reduceByKey( (x,y) => if(x.split("\\s+")(1) > y.split("\\s+")(1)) x else y )
latestByUser.foreach(println)
//Step 3. Join the original data with the latest date to get the result
val latestKeyedByUserAndDate = latestByUser.map( x => (x._1 + ":"+x._2.split("\\s+")(1), x._2))
val originalKeyedByUserAndDate = data.map(line => { val a = line.split("\\s+"); ( a(0) +":"+a(1), line ) })
val result=latestKeyedByUserAndDate.join(originalKeyedByUserAndDate)
result.foreach(println)
//Step 4. Transform the result into the format you desire
def createCombiner(v:(String,String)):List[(String,String)] = List[(String,String)](v)
def mergeValue(acc:List[(String,String)], value:(String,String)) : List[(String,String)] = value :: acc
def mergeCombiners(acc1:List[(String,String)], acc2:List[(String,String)]) : List[(String,String)] = acc2 ::: acc1
//use combineByKey
val transformedResult=result.mapValues(l=> { val a=l._2.split(" +"); (a(2),a(3)) } ).combineByKey(createCombiner,mergeValue,mergeCombiners)
transformedResult.foreach(println)
Upvotes: 1
Reputation: 3956
Here are the scripts
For scala
val data = sc.textFile("file:///home/cloudera/data.txt")
val dataMap = data.map(x => (x.split(" +")(0), x))
val dataReduce = dataMap.reduceByKey((x, y) =>
if(x.split(" +")(1) >= y.split(" +")(1)) x
else y)
val dataUserAndDateKey = data.map(rec => ((rec.split(" +")(0), rec.split(" +")(1)), rec))
val dataReduceUserAndDateKey = dataReduce.map(rec => ((rec._2.split(" +")(0), rec._2.split(" +")(1)), rec(1)))
val joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey)
joinData.map(rec => rec._2._1).foreach(println)
For pyspark
import re
data = sc.textFile("file:///home/cloudera/data.txt")
dataMap = data.map(lambda rec: (re.split('\s+', rec)[0], rec))
dataReduce = dataMap.reduceByKey(lambda x, y: x if(re.split('\s+', x)[1] >= re.split('\s+', y)[1]) else y)
dataUserAndDateKey = data.map(lambda rec: ((re.split('\s+', rec)[0], re.split('\s+', rec)[1]), rec))
dataReduceUserAndDateKey = dataReduce.map(lambda rec: ((re.split('\s+', rec[1])[0], re.split('\s+', rec[1])[1]), rec[1]))
joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey)
for i in joinData.collect(): print(i[1][0])
Here is the output:
3 2015-12-02 13 1.4
3 2015-12-02 50 1.0
3 2015-12-02 19 7.8
2 2015-12-31 20 2.5
1 2015-12-02 8 9.4
1 2015-12-02 90 1.3
You can also use SQL in HiveContext of SparkContext using data frames.
Upvotes: 2
Reputation: 1530
Assuming this data set is larger, you'll likely want to partition by Date if your data retrieval pattern is keyed by Date.
This will avoid a full scan/shuffle across all your data upon Read - instead, keeping rows in the correct partition upon Write.
Upvotes: 0