Reputation: 659
I have a Scala code like the following:
val new_rdd = rdd1.join(rdd2).map(x => (x._2._2, x._2._1))
I am not sure, but I think I could use case class
to avoid the use of this x._2._2
. Could you help me?
In this case, rdd1
is RDD[String,Array[String]]
and rdd2
is RDD[String,Array[Int]]
.
Upvotes: 0
Views: 148
Reputation: 1528
Something like this could also be done:
case class Items(id: String,items: List[String])
case class ItemCounts(id: String,itemcounts: List[Int])
val rdd1 = sc.parallelize(Seq(
Items("id1",List("item10","item2","item4")),
Items("id2",List("item4","item9")),
Items("id3",List("item1","item3"))))
val rdd2 = sc.parallelize(Seq(
ItemCounts("id1",List(100,200)),
ItemCounts("id2",List(200,500,100,1100)),
ItemCounts("id3",List(10))))
Create a pairRDD in order to join.
val ItemsRDD = rdd1.map(item => (item.id, item))
val ItemsCountsRDD= rdd2.map(itemcnts => (itemcnts.id, itemcnts))
ItemsRDD.join(ItemsCountsRDD).map(x => (x._2._1.id,x._2._2.itemcounts))
.collect.foreach(println)
Result looks like below:
(id3,List(10))
(id1,List(100, 200))
(id2,List(200, 500, 100, 1100))
Upvotes: 1
Reputation: 27356
This isn't a case class
but I think this may be what you are looking for:
val new_rdd = rdd1.join(rdd2).map{ case (_, (a, b)) => (b, a) }
Upvotes: 4