giaosudau
giaosudau

Reputation: 2251

How to use reduceByKey to add a value into a Set in Scala Spark?

After I map my RDD to

((_id_1, section_id_1), (_id_1, section_id_2), (_id_2, section_3), (_id_2, section_4))

I want to reduceByKey to

((_id_1, Set(section_id_1, section_id_2), (_id_2, Set(section_3, section_4)))
val collectionReduce = collection_filtered.map(item => {
      val extras = item._2.get("extras")
      var section_id = ""
      var extras_id = ""
      if (extras != null) {
        val extras_parse = extras.asInstanceOf[BSONObject]
        section_id = extras_parse.get("guid").toString
        extras_id = extras_parse.get("id").toString
      }
      (extras_id, Set {section_id})
    }).groupByKey().collect()

My output is

((_id_1, (Set(section_1), Set(section_2))), (_id_2, (Set(section_3), Set(section_4))))

How do I fix that?

Upvotes: 4

Views: 4833

Answers (2)

ccheneson
ccheneson

Reputation: 49410

Here is an alternative with groupByKey/mapValues

val rdd = sc.parallelize(List(("_id_1", "section_id_1"), ("_id_1", "section_id_2"), ("_id_2", "section_3"), ("_id_2", "section_4")))

rdd.groupByKey().mapValues( v => v.toSet).foreach(println)

Here is another alternative using combineByKey (recommended over groupByKey) :

rdd.combineByKey(
        (value: String) => Set(value),
        (x: Set[String], value: String) => x + value ,
        (x: Set[String], y:     Set[String]) => (x ++ y)
    ).foreach(println)

Upvotes: 2

Peter Neyens
Peter Neyens

Reputation: 9820

You can use reduceByKey by simply using ++ to combine the lists.

val rdd = sc.parallelize((1, Set("A")) :: (2, Set("B")) :: (2, Set("C")) :: Nil)
val reducedRdd = rdd.reduceByKey(_ ++ _)
reducedRdd.collect()
// Array((1,Set(A)), (2,Set(B, C)))

In your case :

collection_filtered.map(item => {
  // ...
  (extras_id, Set(section_id))
}).reduceByKey(_ ++ _).collect()

Upvotes: 4

Related Questions