Reputation: 1015
I have two SchemaRDD's and I want to perform join operation on them (same like SQL join). please help me.
Upvotes: 1
Views: 8504
Reputation: 3571
You can actually do a SQL join if you register the two SchemaRDDs as tables. While the following example uses case classes the technique doesn't depend on them:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class Score(name: String, score: Int)
case class Age(name: String, age: Int)
val scores = sc.textFile("scores.txt").map(_.split(",")).map(s => Score(s(0),s(1).trim.toInt))
val ages = sc.textFile("ages.txt").map(_.split(",")).map(s => Age(s(0),s(1).trim.toInt))
scores.registerAsTable("scores")
ages.registerAsTable("ages")
val joined = sqlContext.sql("""
SELECT a.name, a.age, s.score
FROM ages a JOIN scores s
ON a.name = s.name""")
joined.collect().foreach(println)
Another approach, that doesn't require registration, is to use the language-integrated syntax (one suspects there is a way to streamline this):
val scoresAliased = scores.as('s)
val agesAliased = ages.as('a)
val joined =
scoresAliased.join(agesAliased, Inner, Some("s.name".attr === "a.name".attr))
Upvotes: 4
Reputation: 1506
From the spark documentation
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
You would want to run your schemaRDDs through a map transformation to put them in the (K,V)
form: K
is the key you want to join on, and V
can just be the entire row object. E.g.
val a = ...
val b = ...
val bWithKey = b.map(v => (v.key, v))
val joined = a.map(v => (v.key, v)).join(bWithKey)
Upvotes: 2