Y0gesh Gupta
Y0gesh Gupta

Reputation: 2204

how to join two hashmaps by a key in spark RDD

I have two RDD each in the format of

{string, HashMap[long,object]}

I want to perform a join operation on them such that the hashmap of the same key get merge in scala.

RDD1-> {string1,HashMap[{long a,object},{long b,object}]
RDD2-> {string1,HashMap[{long c,object}]

After joining the two RDD, it should be like

RDD->{string1,HashMap[{long a,object},{long b,object},{long c,object}]

Any help will be appreciated, also I am kind of new to scala and spark.

Upvotes: 1

Views: 4862

Answers (2)

DNA
DNA

Reputation: 42617

Update: a simpler way is just to take the union and then reduce by key:

(rdd1 union rdd2).reduceByKey(_++_)

Older solution, just for reference. This can also be done by cogroup, which collects values for keys in one or both RDDs (whereas join will omit values that only have a key in one of the original RDDs). See the ScalaDoc.

We then concatenate the lists of values using ++ to form a single list of values, and finally reduce the values (Maps) to a single Map.

The last two steps can be combined into a single mapValues operation:

Using this data...

val rdd1 = sc.parallelize(List("a"->Map(1->"one", 2->"two")))
val rdd2 = sc.parallelize(List("a"->Map(3->"three")))

...in the spark shell:

val x = (rdd1 cogroup rdd2).mapValues{ case (a,b) => (a ++ b).reduce(_++_)}

x foreach println

> (a,Map(1 -> one, 2 -> two, 3 -> three))

Upvotes: 4

cybye
cybye

Reputation: 1171

You can do by joining the two RDDs and applying a merge function to the tuples of maps:

def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] Return an RDD containing all pairs of elements with matching keys in this and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs a hash join across the cluster.

def mapValues[U](f: (V) ⇒ U): RDD[(K, U)] Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.

assume, there is a function merge like discussed in Best way to merge two maps and sum the values of same key?

def [K] merge(a:K,b:K):K = ???

could be like

def merge(a:Map[K,V],b:Map[K,V]) = a ++ b

given that, the RDDs can be joined first

val joined = RDD1.join(RDD2) 

and then mapped

val mapped = joined.mapValues( v => merge(v._1,v._2))

The result is an RDD with (Key, the merged Map)..

Upvotes: 1

Related Questions