yari
yari

Reputation: 99

How to find Sum at Each partition in Spark

I have created class and used that class to create RDD. I want to calculate sum of LoudnessRate (member of class) at each partition. This sum will be later used to calculate Mean LoudnessRate at each partition. I have tried following code but it does not calculate Sum and returns 0.0. My code is

    object sparkBAT {
      def main(args: Array[String]): Unit = {
        val numPartitions = 3
        val N = 50
        val d = 5
        val MinVal = -10
        val MaxVal =  10
        val conf = new SparkConf().setMaster(locally("local")).setAppName("spark Sum")
        val sc = new SparkContext(conf)

        val ba = List.fill(N)(new BAT(d, MinVal, MaxVal))
        val rdd = sc.parallelize(ba, numPartitions)

        var arrSum =Array.fill(numPartitions)(0.0) // Declare Array that will hold sum for each Partition
        rdd.mapPartitionsWithIndex((k,iterator) => iterator.map(x => arrSum(k) += x.LoudnessRate)).collect()
        arrSum foreach println
      }
    }


    class BAT (dim:Int, min:Double, max:Double) extends Serializable {    
      val random = new Random()
      var position      : List[Double]      =   List.fill(dim) (random.nextDouble() * (max-min)+min )
      var velocity      :List[Double]       =   List.fill(dim)( math.random)
      var PulseRate     : Double            =   0.1
      var LoudnessRate  :Double             =   0.95
      var frequency     :Double             =   math.random
      var fitness       :Double             =   math.random
      var BestPosition  :List[Double]       =   List.fill(dim)(math.random)
      var BestFitness   :Double             =   math.random 
    }

Upvotes: 2

Views: 719

Answers (2)

ollik1
ollik1

Reputation: 4540

Changing my comment to an answer as requested. Original comment

You are modifying arrSum in executor JVMs and printing its values in the dirver JVM. You can map the iterators to singleton iterators and use collect to move the values to the driver. Also, don't use iterator.map for side-effects, iterator.foreach is meant for that.

And here is a sample snippet how to do it. First creating a RDD with two partitions, 0 -> 1,2,3 and 1 -> 4,5. Naturally you would not need this in actual code but as the sc.parallelize behaviour changes depending on environment, this will always create uniform RDDs to reproduce:

object DemoPartitioner extends Partitioner {
  override def numPartitions: Int = 2
  override def getPartition(key: Any): Int = key match {
    case num: Int => num
  }
}
val rdd = sc
  .parallelize(Seq((0, 1), (0, 2), (0, 3), (1, 4), (1, 5)))
  .partitionBy(DemoPartitioner)
  .map(_._2)

And then the actual trick:

val sumsByPartition = rdd.mapPartitionsWithIndex {
  case (partitionNum, it) => Iterator.single(partitionNum -> it.sum)
}.collect().toMap
println(sumsByPartition)

Outputs:

Map(0 -> 6, 1 -> 9)

Upvotes: 4

Nir Hedvat
Nir Hedvat

Reputation: 870

The problem is that you're using arrSum (a regular collection) that is declared in your Driver and updated in the Executors. Whenever you're doing that you need to use Accumulators.

This should help

Upvotes: 0

Related Questions