Asif
Asif

Reputation: 733

Spark code takes too much time to run on cluster

I have written a Spark application. My code works fine for smaller size population (dataset) but it takes too much time for the larger population (dataset).

Here is the definition of class.

case class myClass( d : Int  , max : Double , min : Double  , f:  (List[Double],Double)  =>  Double )   extends Serializable  {
 val random  = new Random()
 var pos      : Array[Double]          =   Array.fill ( d ) (  random.nextDouble()*(max-min)+min  )
 var vel      : Array[Double]          =   Array.fill ( d ) (  math.random)
 var PR     : Double                =   0.1         //Rate
 var LR     : Double                    =   0.95
 var fitness       : Double              =   f (this.pos.toList ,0)
 val Band : Double = 0.001

//Functions definition
def move  (  l_Best : Array  [  Double  ]  ,  sumL : Double , size : Int  ,  f :  (  List  [  Double  ] ,Double  )  =>  Double  )  : (  Array  [  Double  ]  ,  Double   )  = {

 val f   =  math.random   
 val temp1  =  ElementsWiseSubtract  ( pos  , l_Best )
 val temp2  =    temp1.map  (  _  * freq  ) 
 val newVel  =  ElementsWiseSum  ( temp2 , vel  )      // vel == this.vel
 var newPos = ElementsWiseSum  (  pos  ,   newVel )  // pos == this.pos
 if (  math.random   >   this.PR  )  {
    newPos  =  l_Best.map  (  _  *  (  Band *  (  sumL  /  size  )  ) ).toArray
 }
 val nFit = f  (  newPos.toList  , 0  )
 ( newPos   ,  nFit  )
} 
def ElementsWiseSum  (  arr  : Array[Double] , arr1  : Array[Double]   )  :  Array [Double] =  {
 var res  :  Array[ Double ]  =  Array()
 if  (  arr.length  ==  1  )
   res  =  Array  (  arr.head  +  arr1.head  )
 else
   res  =   ElementsWiseSum  (   arr.slice(0, arr.length/2) ,arr1.slice(0, arr1.length/2) )  ++
     ElementsWiseSum  (   arr.slice(arr.length/2,  arr.length ) ,  arr1.slice( arr1.length/2 , arr1.length)  )
 res
}
def ElementsWiseSubtract  (  arr  :Array[Double] , arr1  :Array[Double]   )  :  Array [Double] =  {
 var res  :  Array[ Double ]  =  Array()
 if  (  arr.length  ==  1  )
   res  =  Array  (  arr.head  -  arr1.head  )
 else
   res  =   ElementsWiseSubtract  (   arr.slice(0, arr.length/2) ,arr1.slice(0, arr1.length/2) )  ++
     ElementsWiseSubtract  (   arr.slice  (  arr.length / 2,  arr.length ) ,  arr1.slice  ( arr1.length / 2 , arr1.length )  )
 res
  }
}

Main function definition and creation of RDD is:

@tailrec 
final def Sphere  (  Alleles:List[Double]  ,  accumulator:Double  )  :  Double =  Alleles match  {
 case  Nil  =>  accumulator
 case  x  ::  xs  =>    Sphere (  xs  ,  accumulator  +  Math.pow  (  x , 2  )  )
}

val N = 10000               // population size
val d =  10000             //  dimensions
val nP = 20               //   partitions
val iterations  = 100000  //   total iterations. This could be less more 
val RDD = sc.parallelize(0 until  N , nP).mapPartitionsWithIndex{ (index,iter)  =>
      val data  =  iter.map(i =>
        new myClass(  d, max,min   ,  Sphere   )   )
      data
    }.persist(StorageLevel.MEMORY_AND_DISK)  


val itr = 1 
val res = RDD.mapPartitionsWithIndex {
  (index, Iterator) => {
    var li = Iterator.toArray
      li = li.sortWith(_.fitness < _.fitness)

    val res = loop(li, iterations, itr)
    val bests = res.sortWith(_.fitness < _.fitness).take(5).map(x => (x, index))
    bests.toIterator
  }
}
 @tailrec
private def loop(arr: Array[myClass], iteration: Int, itret: Int): Array[myClass] = {
iteration match {
  case 0 => arr
  case _ => {
    arr.map { j =>
      val l_Best = arr.minBy(_.fitness).pos // sort based on fitness
      val l_Sum: Double = arr.map(_.LR).reduce( _ + _ )   // Calculate sum of Rate
      val res = j.move(l_Best, l_Sum, arr.size, Sphere)

      if (math.random < j.LR && res._2 < j.fitness) {
        j.pos = res._1
        j.fitness = res._2
        j.LR = j.LR * 0.95
        j.PR = 0.95 * (1 - math.pow(math.E, (-0.95 * itret)))
      }
    }
    loop(arr, iteration - 1, itret + 1)
  }
 }
}

I tested this code on 4 node cluster For N = 100 and d = 100. It takes less than one minute to complete 10,000 iterations, but for N = 10,000 and d = 10,000, it took 19 hours to complete just 500 iterations.

According to my observations, functions that do element-wise operations inside myClass class taking a long time. How can I increase its speed? Please give some suggestion. I want to execute it with the following configurations.

N = 10,000 ,d = 10,000 and iterations = 1000000000 (1 billion )

Upvotes: 0

Views: 147

Answers (1)

Charlie Flowers
Charlie Flowers

Reputation: 1380

You can greatly simplify your element-wise operations, e.g.:

def ElementsWiseSubtract ( arr :Array[Double] , arr1 :Array[Double] ) : Array [Double] = arr.zip(arr1).map(x => x._1 - x._2)

Upvotes: 3

Related Questions