Reputation: 87
I'm working with a structured input file that contains userId, seqId, eventType and country. I need to reduce it by userId taking the last non-empty value of each field after ordering by seqId. For the given input:
userId seqId eventType country
A1600001 2 Update JP
A1600001 3 Update
B2301001 2 Update CH
A1600001 1 Create CH
C1200011 2 Update
C1200011 1 Create IN
The reduced result should be:
A1600001 3 Update JP
C1200011 2 Update IN
B2301001 2 Update CH
I started with the following:
scala> val file = sc.textFile("/tmp/sample-events.tsv")
scala> val lines = file.map( x => (x.split("\t")(0), x) )
scala> lines.foreach(x => println(x))
(A1600001,A1600001 2 Update JP)
(A1600001,A1600001 3 Update )
(B2301001,B2301001 2 Update CH)
(A1600001,A1600001 1 Create CH)
(C1200011,C1200011 2 Update )
(C1200011,C1200011 1 Create IN)
Now I want to reduceByKey
lines (I guess?), but I'm pretty new to the subject and I don't know how to construct the reduction function. Can someone help?
Upvotes: 1
Views: 294
Reputation: 8751
Using spark-sql and window functions.
scala> val df = Seq(("A1600001",2,"Update","JP"),("A1600001",3,"Update",""),("B2301001",2,"Update","CH"),("A1600001",1,"Create","CH"),("C1200011",2,"Update",""),("C1200011",1,"Create","IN")).toDF("userId","seqId","eventType","country")
df: org.apache.spark.sql.DataFrame = [userId: string, seqId: int ... 2 more fields]
scala> df.createOrReplaceTempView("samsu")
scala> spark.sql(""" with tb1(select userId, seqId, eventType, country, lag(country) over(partition by userid order by seqid) lg1, row_number() over(partition by userid order by seqid) rw1,co
unt(*) over(partition by userid) cw1 from samsu) select userId, seqId, eventType,case when country="" then lg1 else country end country from tb1 where rw1=cw1 """).show(false)
+--------+-----+---------+-------+
|userId |seqId|eventType|country|
+--------+-----+---------+-------+
|A1600001|3 |Update |JP |
|C1200011|2 |Update |IN |
|B2301001|2 |Update |CH |
+--------+-----+---------+-------+
scala>
Upvotes: 1
Reputation: 267
The simplest solution that I can think of with ReduceByKey is here.
//0: userId 1: seqId 2: eventType 3: country
val inputRdd = spark.sparkContext.textFile("data/input.txt")
.map(_.split("\\s+", 4))
//Here reduce by userId and taking the record which is having max(seqId)
// order by seqId so that if the max value missing country, can be merged that value from the immediate seqId
inputRdd
.map(ls => (ls(0), ls))
.sortBy(_._2(1).toInt)
.reduceByKey {
(acc, y) =>
if (acc(1).toInt < y(1).toInt)
if (y.length == 3) y :+ acc(3) else y
else
acc
}.map(_._2.mkString("\t"))
.foreach(println)
data/input.txt
A1600001 2 Update JP
A1600001 3 Update
B2301001 2 Update CH
A1600001 1 Create CH
C1200011 2 Update
C1200011 1 Create IN
Output:
B2301001 2 Update CH
C1200011 2 Update IN
A1600001 3 Update JP
Upvotes: 0
Reputation: 3544
One possible way (assuming that seqId
is never empty):
pair_rdd1
by filtering out all empty eventType
values with mapper first, then apply reduceByKey
on key=userId
to find latest non-empty eventType
per userId
. Assuming the reducer function takes two [seqId, eventType]
pairs and returns [seqId, eventType]
pair, reduce function should look like: (v1 v2) => ( if(v1[seqId] > v2[seqId]) then v1 else v2 )
pair_rdd2
by filtering out all empty country
values with mapper first, then apply reduceByKey
on key=userId
to find latest non-empty country
per userId
. Assuming the reducer function takes two [seqId, country]
pairs and returns [seqId, country]
pair, reduce function should look like: (v1 v2) => ( if(v1[seqId] > v2[seqId]) then v1 else v2 )
seqId
per userId
as well, we prepare pair_rdd3
by applying reduceByKey
on key=userId
and reducer function: (seqId1 seqId2) => max(seqId1, seqId2)
pair_rdd3.leftOuterJoin(pair_rdd1)
to get [userId, seqId, eventType]
, then on the result of left join we perform .leftOuterJoin(pair_rdd2)
to finally get [userId, seqId, eventType, country]
(both joins are on key=userId
) Note that we use left join
instead of inner join
here, since there could be user IDs with all eventTypes or all countries being empty
Upvotes: 0