Reputation: 9065
I have got multi RDDs as result and want to merge them, they are of the same format:
RDD(id, HashMap[String, HashMap[String, Int]])
^ ^ ^
| | |
identity category distribution of the category
Here is a example of that rdd:
(1001, {age={10=3,15=5,16=8, ...}})
The first keyString
of the HashMap[String, HashMap]
is the category of the statistic and the HashMap[String, Int]
in the HashMap[String, HashMap]
is the distribution of the category. After calculate each distribution of vary categories, I want to merge them by the identity so that I can store the results to database. Here is what I got currently:
def mergeRDD(rdd1: RDD[(String, util.HashMap[String, Object])],
rdd2:RDD[(String, util.HashMap[String, Object])]): RDD[(String, util.HashMap[String, Object])] = {
val mergedRDD = rdd1.join(rdd2).map{
case (id, (m1, m2)) => {
m1.putAll(m2)
(id, m1)
}
}
mergedRDD
}
val mergedRDD = mergeRDD(provinceRDD, mergeRDD(mergeRDD(levelRDD, genderRDD), actionTypeRDD))
I write a function mergeRDD
so that I can merge two rdds each time, But I found that function is not very elegant, as a newbie to scala, any inspiring is appreciated.
Upvotes: 0
Views: 405
Reputation: 8851
You may replace the java.util.HashMap
with scala.collection.immutable.Map
From there:
val rdds = List(provinceRDD, levelRDD, genderRDD, actionTypeRDD)
val unionRDD = rdds.reduce(_ ++ _)
val mergedRDD = unionRDD.reduceByKey(_ ++ _)
This is assuming that categories don't overlap between rdds.
Upvotes: 0
Reputation: 1992
I don't see any easy way to achieve this, without hitting performance. Reason being, you are not simply merging two rdd, rather, you want your hashmap to have consolidated values after union of rdd.
Now, your merge function is wrong. In current state join will actually do inner join, missing out rows present in either rdd not present in other one.
Correct way would be something like.
val mergedRDD = rdd1.union(rdd2).reduceByKey{
case (m1, m2) => {
m1.putAll(m2)
}
}
Upvotes: 2