aola
aola

Reputation: 87

merge sets' elements with HashMap's key in scala

I hope there is an easy way to solve that I have two RDDs

g.vertices
(4,Set(5, 3))
(0,Set(1, 4))
(1,Set(2))
(6,Set())
(3,Set(0))
(5,Set(2))
(2,Set(1))

maps
Map(4 -> Set(5, 3))
Map(0 -> Set(1, 4))
Map(1 -> Set(2))
Map(6 -> Set())
Map(3 -> Set(0))
Map(5 -> Set(2))
Map(2 -> Set(1))

How can I do something like this?

(4,Map(5 -> Set(2), 3 -> Set(0)))
(0,Map(1 -> Set(2), 4 -> Set(5, 3)))
(1,Map(2 -> Set(1)))
(6,Map())
(3,Map(0 -> Set(1, 4)))
(5,Map(2 -> Set(1)))
(2,Map(1 -> Set(2)))

I want to combine map's key with elements of set, so I want to change sets' elements (merge them with map's key)

I thought about

val maps = g.vertices.map { case (id, attr) => HashMap(id -> attr) }

g.mapVertices{case (id, data) => data.map{case vId => maps.
  map { case i if i.keySet.contains(vId) => HashMap(vId -> i.values) } }}

but I have an error

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

Upvotes: 0

Views: 201

Answers (1)

Cyrille Corpet
Cyrille Corpet

Reputation: 5315

This is a simple use case for join. In the following code, A is the type of the keys in g.vertices, K and V are the key and value types for maps:

def joinByKeys[A, K, V](sets: RDD[(A, Set[K])], maps: RDD[Map[K, V]]): RDD[(A, Map[K, V])] = {
  val flattenSets = sets.flatMap(p => p._2.map(_ -> p._1)) // create a pair for each element of vertice's sets
  val flattenMaps = maps.flatMap(identity)                 // create an RDD with all indexed values in Maps
  flattenMaps.join(flattenSets).map{                       // join them by their key
    case (k, (v, a)) => (a, (k, v))                        // reorder to put the vertexId as id
  }.aggregateByKey(Map.empty[K, V])(_ + _, _ ++ _)         // aggregate the maps

}

Upvotes: 1

Related Questions