armnotstrong
armnotstrong

Reputation: 9065

what's the best practice to merge rdds in scala

I have got multi RDDs as result and want to merge them, they are of the same format:

RDD(id, HashMap[String, HashMap[String, Int]])
    ^             ^        ^
    |             |        |
  identity     category   distribution of the category

Here is a example of that rdd:

(1001, {age={10=3,15=5,16=8, ...}})

The first keyString of the HashMap[String, HashMap] is the category of the statistic and the HashMap[String, Int] in the HashMap[String, HashMap] is the distribution of the category. After calculate each distribution of vary categories, I want to merge them by the identity so that I can store the results to database. Here is what I got currently:

def mergeRDD(rdd1: RDD[(String, util.HashMap[String, Object])],
              rdd2:RDD[(String, util.HashMap[String, Object])]): RDD[(String, util.HashMap[String, Object])] = {

  val mergedRDD = rdd1.join(rdd2).map{
    case (id, (m1, m2)) => {
      m1.putAll(m2)
      (id, m1)
    }
  }
  mergedRDD
}
val mergedRDD = mergeRDD(provinceRDD, mergeRDD(mergeRDD(levelRDD, genderRDD), actionTypeRDD))

I write a function mergeRDD so that I can merge two rdds each time, But I found that function is not very elegant, as a newbie to scala, any inspiring is appreciated.

Upvotes: 0

Views: 405

Answers (2)

Shyamendra Solanki
Shyamendra Solanki

Reputation: 8851

You may replace the java.util.HashMap with scala.collection.immutable.Map

From there:

val rdds      = List(provinceRDD, levelRDD, genderRDD, actionTypeRDD)
val unionRDD  = rdds.reduce(_ ++ _)
val mergedRDD = unionRDD.reduceByKey(_ ++ _)

This is assuming that categories don't overlap between rdds.

Upvotes: 0

Abhishek Anand
Abhishek Anand

Reputation: 1992

I don't see any easy way to achieve this, without hitting performance. Reason being, you are not simply merging two rdd, rather, you want your hashmap to have consolidated values after union of rdd.

Now, your merge function is wrong. In current state join will actually do inner join, missing out rows present in either rdd not present in other one.

Correct way would be something like.

val mergedRDD = rdd1.union(rdd2).reduceByKey{
    case (m1, m2) => {
      m1.putAll(m2)
      }
}

Upvotes: 2

Related Questions