phattyD
phattyD

Reputation: 205

How to merge/join Spark/Scala RDD to List so each value in RDD gets a new row with each List item

Lets say I have a List[String] and I want to merge it with a RDD Object so that each object in the RDD gets each value in the List added to it:

List[String] myBands = ["Band1","Band2"]; 

Table: BandMembers |name | instrument | | ----- | ---------- | | slash | guitar | | axl | vocals |

case class BandMembers ( name:String, instrument:String );
var myRDD = BandMembersTable.map(a => new BandMembers(a.name, a.instrument));  
//join the myRDD to myBands
// how do I do this?
//var result = myRdd.join/merge/union(myBands); 

Desired result: |name | instrument | band | | ----- | ---------- |------| | slash | guitar | band1| | slash | guitar | band2| | axl | vocals | band1| | axl | vocals | band2|

I'm not quite sure how to go about this in the best way for Spark/Scala. I know I can convert to DF and then use spark sql to do the joins, but there has to be a better way with the RDD and List, or so I think.

Upvotes: 0

Views: 414

Answers (2)

srvndr
srvndr

Reputation: 33

Consider using RDD zip for this.. From official docs

RDD<scala.Tuple2<T,U>> zip(RDD other, scala.reflect.ClassTag evidence$11) Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD,

Upvotes: 0

kanielc
kanielc

Reputation: 1322

The style is a bit off here, but assuming you really need RDD's instead of Dataset

So with RDD:

case class BandMembers ( name:String, instrument:String )
val myRDD = spark.sparkContext.parallelize(BandMembersTable.map(a => new BandMembers(a.name, a.instrument)))
val myBands = spark.sparkContext.parallelize(Seq("Band1","Band2"))
val res = myRDD.cartesian(myBands).map { case (a,b) => Row(a.name, a.instrument, b) }

With Dataset:

case class BandMembers ( name:String, instrument:String )
val myRDD = BandMembersTable.map(a => new BandMembers(a.name, a.instrument)).toDS
val myBands = Seq("Band1","Band2").toDS
val res = myRDD.crossJoin(myBands)

Input data:

val BandMembersTable = Seq(BandMembers("a", "b"), BandMembers("c", "d"))
val myBands = Seq("Band1","Band2")

Output with Dataset:

+----+----------+-----+
|name|instrument|value|
+----+----------+-----+
|a   |b         |Band1|
|a   |b         |Band2|
|c   |d         |Band1|
|c   |d         |Band2|
+----+----------+-----+

Println with RDDs (these are Rows)

[a,b,Band1]
[c,d,Band2]
[c,d,Band1]
[a,b,Band2]

Upvotes: 1

Related Questions