Siddardha Budige
Siddardha Budige

Reputation: 1015

How to perform join operation on two schema RDDs?

I have two SchemaRDD's and I want to perform join operation on them (same like SQL join). please help me.

Upvotes: 1

Views: 8504

Answers (2)

Spiro Michaylov
Spiro Michaylov

Reputation: 3571

You can actually do a SQL join if you register the two SchemaRDDs as tables. While the following example uses case classes the technique doesn't depend on them:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class Score(name: String, score: Int)
case class Age(name: String, age: Int)
val scores = sc.textFile("scores.txt").map(_.split(",")).map(s => Score(s(0),s(1).trim.toInt))
val ages = sc.textFile("ages.txt").map(_.split(",")).map(s => Age(s(0),s(1).trim.toInt))
scores.registerAsTable("scores")
ages.registerAsTable("ages")
val joined = sqlContext.sql("""
    SELECT a.name, a.age, s.score
    FROM ages a JOIN scores s
    ON a.name = s.name""")
joined.collect().foreach(println)

Another approach, that doesn't require registration, is to use the language-integrated syntax (one suspects there is a way to streamline this):

val scoresAliased = scores.as('s)
val agesAliased = ages.as('a)
val joined = 
  scoresAliased.join(agesAliased, Inner, Some("s.name".attr === "a.name".attr))

Upvotes: 4

mattwise
mattwise

Reputation: 1506

From the spark documentation

join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.

You would want to run your schemaRDDs through a map transformation to put them in the (K,V) form: K is the key you want to join on, and V can just be the entire row object. E.g.

val a = ...
val b = ...
val bWithKey = b.map(v => (v.key, v))
val joined = a.map(v => (v.key, v)).join(bWithKey)

Upvotes: 2

Related Questions