Garipaso
Garipaso

Reputation: 421

Scala hashmap not getting appended

I don't understand what is wrong with the code below. This works fine and hashmap typeMap gets updated if my input data frame is not partitioned. But if the code below is executed in a partitioned environment, typeMap is always empty and not updated. What is wrong with this code? Thanks for all your help.

var typeMap = new mutable.HashMap[String, (String, Array[String])]
case class Combiner(,,,,,,,    mapTypes: mutable.HashMap[String, (String, Array[String])]) {
    def execute() {
        <...>
        val combinersResult = dfInput.rdd.aggregate(combiners.toArray) (incrementCount, mergeCount)
    }

    def updateTypes(arr: Array[String], tempMapTypes:mutable.HashMap[String, (String, Array[String])]): Unit = {
        <...>
        typeMap ++= tempMapTypes
    }

    def incrementCount(combiners: Array[Combiner], row: Row): Array[Combiner] = {
        for (i <- 0 until row.length) {
            val array = getMyType(row(i), tempMapTypes)
            combiners(i). updateTypes(array, tempMapTypes)
        }
        combiners
}

Upvotes: 0

Views: 185

Answers (1)

Vidya
Vidya

Reputation: 30320

It is a really bad idea to use mutable values in distributed computing. With Spark in particular, RDD operations are shipped from the driver to the executors and are executed in parallel on all the different machines in the cluster. Updates made to your mutable.HashMap are never sent back to the driver, so you are stuck with the empty map that got constructed on the driver in the first place.

So you need to completely rethink your data structures by preferring immutability and to remember that operations firing on the executors are independent and parallel.

Upvotes: 2

Related Questions