Prateek Agrawal
Prateek Agrawal

Reputation: 163

Join RDD's to result in complement of intersection

I have two RDD's: The first (User ID, Mov ID, Rating, Timestamp)

data_wo_header: RDD[String]
scala> data_wo_header.take(5).foreach(println)
1,2,3.5,1112486027
1,29,3.5,1112484676
1,32,3.5,1112484819
1,47,3.5,1112484727
1,50,3.5,1112484580

and RDD2 (User ID, Mov ID)

data_test_wo_header: RDD[String]
scala> data_test_wo_header.take(5).foreach(println)
1,2
1,367
1,1009
1,1525
1,1750

I need to join the two RDD's such that joining will remove the entries (UserID, Mov ID) common from RDD1. Can someone guide a scala-spark join for the two RDD's. Also I'll need a join where the new RDD derived from RDD1 has common items only.

Upvotes: 0

Views: 666

Answers (2)

Prateek Agrawal
Prateek Agrawal

Reputation: 163

A super easy approach would be to use subtract by keys. Following is working for me:

val data_wo_header=dropheader(data).map(_.split(",")).map(x=>((x(0),x(1)),(x(2),x(3))))
val data_test_wo_header=dropheader(data_test).map(_.split(",")).map(x=>((x(0),x(1)),1))
val ratings_train=data_wo_header.subtractByKey(data_test_wo_header)
val ratings_test=data_wo_header.subtractByKey(ratings_train)

Upvotes: 0

Swadhin Shahriar
Swadhin Shahriar

Reputation: 46

First convert your RDDs to DataFrame because DataFrame has common sql like APIs such as join, select etc.

To convert your RDDs to DataFrame you need a RDD[Row] instead of RDD[String].

Import sqlContext.implicits._

case class cs1(UserID: Int, MovID: Int, Rating: String, Timestamp: String)

case class cs2(UserID: Int, MovID: Int)

val df1 = data_wo_header.map(row => {
   val splits = row.split(",")

   cs1(splits(0).toInt, splits(1).toInt, splits(2),splits(3))
}).toDF("UserID", "MovID", "Rating", "Timestamp")

 val df2 = data_test_wo_header.map(row => {
   val splits = row.split(",")

   cs2(splits(0).toInt, splits(1).toInt)
}).toDF("UserID", "MovID")

Now, add a new column to df2,

val df2Prime = df2.withColumn("isPresent", lit(1))

Then left join df2Prime with df1 and filter out the rows where isPresent is 1 and you have the intersected result. Also, drop the temporary is isPresent flag.

val temp = df1.join(df2Prime, usingColumns = Seq("UserID", "MovID"), "left")

temp.filter(temp("isPresent") =!= "1").drop("isPresent")

Upvotes: 0

Related Questions