Reputation: 421
I don't understand what is wrong with the code below. This works fine and hashmap typeMap
gets updated if my input data frame is not partitioned. But if the code below is executed in a partitioned environment, typeMap
is always empty and not updated. What is wrong with this code? Thanks for all your help.
var typeMap = new mutable.HashMap[String, (String, Array[String])]
case class Combiner(,,,,,,, mapTypes: mutable.HashMap[String, (String, Array[String])]) {
def execute() {
<...>
val combinersResult = dfInput.rdd.aggregate(combiners.toArray) (incrementCount, mergeCount)
}
def updateTypes(arr: Array[String], tempMapTypes:mutable.HashMap[String, (String, Array[String])]): Unit = {
<...>
typeMap ++= tempMapTypes
}
def incrementCount(combiners: Array[Combiner], row: Row): Array[Combiner] = {
for (i <- 0 until row.length) {
val array = getMyType(row(i), tempMapTypes)
combiners(i). updateTypes(array, tempMapTypes)
}
combiners
}
Upvotes: 0
Views: 185
Reputation: 30320
It is a really bad idea to use mutable values in distributed computing. With Spark in particular, RDD
operations are shipped from the driver to the executors and are executed in parallel on all the different machines in the cluster. Updates made to your mutable.HashMap
are never sent back to the driver, so you are stuck with the empty map that got constructed on the driver in the first place.
So you need to completely rethink your data structures by preferring immutability and to remember that operations firing on the executors are independent and parallel.
Upvotes: 2