samsu
samsu

Reputation: 87

Spark - reducing input file by user Id

I'm working with a structured input file that contains userId, seqId, eventType and country. I need to reduce it by userId taking the last non-empty value of each field after ordering by seqId. For the given input:

userId    seqId eventType country
A1600001    2   Update  JP
A1600001    3   Update  
B2301001    2   Update  CH
A1600001    1   Create  CH
C1200011    2   Update  
C1200011    1   Create  IN

The reduced result should be:

A1600001    3   Update  JP
C1200011    2   Update  IN
B2301001    2   Update  CH

I started with the following:

scala> val file = sc.textFile("/tmp/sample-events.tsv")
scala> val lines = file.map( x => (x.split("\t")(0), x) )
scala> lines.foreach(x => println(x))
(A1600001,A1600001  2   Update  JP)
(A1600001,A1600001  3   Update  )
(B2301001,B2301001  2   Update  CH)
(A1600001,A1600001  1   Create  CH)
(C1200011,C1200011  2   Update  )
(C1200011,C1200011  1   Create  IN)

Now I want to reduceByKey lines (I guess?), but I'm pretty new to the subject and I don't know how to construct the reduction function. Can someone help?

Upvotes: 1

Views: 294

Answers (3)

stack0114106
stack0114106

Reputation: 8751

Using spark-sql and window functions.

scala> val df = Seq(("A1600001",2,"Update","JP"),("A1600001",3,"Update",""),("B2301001",2,"Update","CH"),("A1600001",1,"Create","CH"),("C1200011",2,"Update",""),("C1200011",1,"Create","IN")).toDF("userId","seqId","eventType","country")
df: org.apache.spark.sql.DataFrame = [userId: string, seqId: int ... 2 more fields]

scala> df.createOrReplaceTempView("samsu")

scala> spark.sql(""" with tb1(select userId, seqId, eventType, country, lag(country) over(partition by userid order by seqid) lg1, row_number() over(partition by userid order by seqid) rw1,co
unt(*) over(partition by userid) cw1 from samsu) select userId, seqId, eventType,case when country="" then lg1 else country end country from tb1 where rw1=cw1 """).show(false)
+--------+-----+---------+-------+                                              
|userId  |seqId|eventType|country|
+--------+-----+---------+-------+
|A1600001|3    |Update   |JP     |
|C1200011|2    |Update   |IN     |
|B2301001|2    |Update   |CH     |
+--------+-----+---------+-------+


scala>

Upvotes: 1

Bhima Rao Gogineni
Bhima Rao Gogineni

Reputation: 267

The simplest solution that I can think of with ReduceByKey is here.

//0: userId    1: seqId  2: eventType 3: country
val inputRdd = spark.sparkContext.textFile("data/input.txt")
  .map(_.split("\\s+", 4))

//Here reduce by userId and taking the record which is having max(seqId)
// order by seqId so that if the max value missing country, can be merged that value from the immediate seqId
inputRdd
  .map(ls => (ls(0), ls))
  .sortBy(_._2(1).toInt)
  .reduceByKey {
    (acc, y) =>
      if (acc(1).toInt < y(1).toInt)
        if (y.length == 3) y :+ acc(3) else y
      else
        acc
  }.map(_._2.mkString("\t"))
  .foreach(println)

data/input.txt

A1600001    2   Update  JP
A1600001    3   Update
B2301001    2   Update  CH
A1600001    1   Create  CH
C1200011    2   Update
C1200011    1   Create  IN

Output:

B2301001    2   Update  CH
C1200011    2   Update  IN
A1600001    3   Update  JP

Upvotes: 0

mangusta
mangusta

Reputation: 3544

One possible way (assuming that seqId is never empty):

  1. prepare pair_rdd1 by filtering out all empty eventType values with mapper first, then apply reduceByKey on key=userId to find latest non-empty eventType per userId. Assuming the reducer function takes two [seqId, eventType] pairs and returns [seqId, eventType] pair, reduce function should look like: (v1 v2) => ( if(v1[seqId] > v2[seqId]) then v1 else v2 )
  2. prepare pair_rdd2 by filtering out all empty country values with mapper first, then apply reduceByKey on key=userId to find latest non-empty country per userId. Assuming the reducer function takes two [seqId, country] pairs and returns [seqId, country] pair, reduce function should look like: (v1 v2) => ( if(v1[seqId] > v2[seqId]) then v1 else v2 )
  3. since we need the latest seqId per userId as well, we prepare pair_rdd3 by applying reduceByKey on key=userId and reducer function: (seqId1 seqId2) => max(seqId1, seqId2)
  4. now we perform pair_rdd3.leftOuterJoin(pair_rdd1) to get [userId, seqId, eventType], then on the result of left join we perform .leftOuterJoin(pair_rdd2) to finally get [userId, seqId, eventType, country] (both joins are on key=userId)

Note that we use left join instead of inner join here, since there could be user IDs with all eventTypes or all countries being empty

Upvotes: 0

Related Questions