Reputation: 2251
After I map my RDD to
((_id_1, section_id_1), (_id_1, section_id_2), (_id_2, section_3), (_id_2, section_4))
I want to reduceByKey
to
((_id_1, Set(section_id_1, section_id_2), (_id_2, Set(section_3, section_4)))
val collectionReduce = collection_filtered.map(item => {
val extras = item._2.get("extras")
var section_id = ""
var extras_id = ""
if (extras != null) {
val extras_parse = extras.asInstanceOf[BSONObject]
section_id = extras_parse.get("guid").toString
extras_id = extras_parse.get("id").toString
}
(extras_id, Set {section_id})
}).groupByKey().collect()
My output is
((_id_1, (Set(section_1), Set(section_2))), (_id_2, (Set(section_3), Set(section_4))))
How do I fix that?
Upvotes: 4
Views: 4833
Reputation: 49410
Here is an alternative with groupByKey
/mapValues
val rdd = sc.parallelize(List(("_id_1", "section_id_1"), ("_id_1", "section_id_2"), ("_id_2", "section_3"), ("_id_2", "section_4")))
rdd.groupByKey().mapValues( v => v.toSet).foreach(println)
Here is another alternative using combineByKey
(recommended over groupByKey
) :
rdd.combineByKey(
(value: String) => Set(value),
(x: Set[String], value: String) => x + value ,
(x: Set[String], y: Set[String]) => (x ++ y)
).foreach(println)
Upvotes: 2
Reputation: 9820
You can use reduceByKey
by simply using ++
to combine the lists.
val rdd = sc.parallelize((1, Set("A")) :: (2, Set("B")) :: (2, Set("C")) :: Nil)
val reducedRdd = rdd.reduceByKey(_ ++ _)
reducedRdd.collect()
// Array((1,Set(A)), (2,Set(B, C)))
In your case :
collection_filtered.map(item => {
// ...
(extras_id, Set(section_id))
}).reduceByKey(_ ++ _).collect()
Upvotes: 4