Reputation: 163
I have two RDD's: The first (User ID, Mov ID, Rating, Timestamp)
data_wo_header: RDD[String]
scala> data_wo_header.take(5).foreach(println)
1,2,3.5,1112486027
1,29,3.5,1112484676
1,32,3.5,1112484819
1,47,3.5,1112484727
1,50,3.5,1112484580
and RDD2 (User ID, Mov ID)
data_test_wo_header: RDD[String]
scala> data_test_wo_header.take(5).foreach(println)
1,2
1,367
1,1009
1,1525
1,1750
I need to join the two RDD's such that joining will remove the entries (UserID, Mov ID) common from RDD1. Can someone guide a scala-spark join for the two RDD's. Also I'll need a join where the new RDD derived from RDD1 has common items only.
Upvotes: 0
Views: 666
Reputation: 163
A super easy approach would be to use subtract by keys. Following is working for me:
val data_wo_header=dropheader(data).map(_.split(",")).map(x=>((x(0),x(1)),(x(2),x(3))))
val data_test_wo_header=dropheader(data_test).map(_.split(",")).map(x=>((x(0),x(1)),1))
val ratings_train=data_wo_header.subtractByKey(data_test_wo_header)
val ratings_test=data_wo_header.subtractByKey(ratings_train)
Upvotes: 0
Reputation: 46
First convert your RDDs to DataFrame because DataFrame has common sql like APIs such as join, select etc.
To convert your RDDs to DataFrame you need a RDD[Row] instead of RDD[String].
Import sqlContext.implicits._
case class cs1(UserID: Int, MovID: Int, Rating: String, Timestamp: String)
case class cs2(UserID: Int, MovID: Int)
val df1 = data_wo_header.map(row => {
val splits = row.split(",")
cs1(splits(0).toInt, splits(1).toInt, splits(2),splits(3))
}).toDF("UserID", "MovID", "Rating", "Timestamp")
val df2 = data_test_wo_header.map(row => {
val splits = row.split(",")
cs2(splits(0).toInt, splits(1).toInt)
}).toDF("UserID", "MovID")
Now, add a new column to df2,
val df2Prime = df2.withColumn("isPresent", lit(1))
Then left join df2Prime with df1 and filter out the rows where isPresent is 1 and you have the intersected result. Also, drop the temporary is isPresent flag.
val temp = df1.join(df2Prime, usingColumns = Seq("UserID", "MovID"), "left")
temp.filter(temp("isPresent") =!= "1").drop("isPresent")
Upvotes: 0