Reputation: 733
I have written a Spark application. My code works fine for smaller size population (dataset) but it takes too much time for the larger population (dataset).
Here is the definition of class.
case class myClass( d : Int , max : Double , min : Double , f: (List[Double],Double) => Double ) extends Serializable {
val random = new Random()
var pos : Array[Double] = Array.fill ( d ) ( random.nextDouble()*(max-min)+min )
var vel : Array[Double] = Array.fill ( d ) ( math.random)
var PR : Double = 0.1 //Rate
var LR : Double = 0.95
var fitness : Double = f (this.pos.toList ,0)
val Band : Double = 0.001
//Functions definition
def move ( l_Best : Array [ Double ] , sumL : Double , size : Int , f : ( List [ Double ] ,Double ) => Double ) : ( Array [ Double ] , Double ) = {
val f = math.random
val temp1 = ElementsWiseSubtract ( pos , l_Best )
val temp2 = temp1.map ( _ * freq )
val newVel = ElementsWiseSum ( temp2 , vel ) // vel == this.vel
var newPos = ElementsWiseSum ( pos , newVel ) // pos == this.pos
if ( math.random > this.PR ) {
newPos = l_Best.map ( _ * ( Band * ( sumL / size ) ) ).toArray
}
val nFit = f ( newPos.toList , 0 )
( newPos , nFit )
}
def ElementsWiseSum ( arr : Array[Double] , arr1 : Array[Double] ) : Array [Double] = {
var res : Array[ Double ] = Array()
if ( arr.length == 1 )
res = Array ( arr.head + arr1.head )
else
res = ElementsWiseSum ( arr.slice(0, arr.length/2) ,arr1.slice(0, arr1.length/2) ) ++
ElementsWiseSum ( arr.slice(arr.length/2, arr.length ) , arr1.slice( arr1.length/2 , arr1.length) )
res
}
def ElementsWiseSubtract ( arr :Array[Double] , arr1 :Array[Double] ) : Array [Double] = {
var res : Array[ Double ] = Array()
if ( arr.length == 1 )
res = Array ( arr.head - arr1.head )
else
res = ElementsWiseSubtract ( arr.slice(0, arr.length/2) ,arr1.slice(0, arr1.length/2) ) ++
ElementsWiseSubtract ( arr.slice ( arr.length / 2, arr.length ) , arr1.slice ( arr1.length / 2 , arr1.length ) )
res
}
}
Main function definition and creation of RDD is:
@tailrec
final def Sphere ( Alleles:List[Double] , accumulator:Double ) : Double = Alleles match {
case Nil => accumulator
case x :: xs => Sphere ( xs , accumulator + Math.pow ( x , 2 ) )
}
val N = 10000 // population size
val d = 10000 // dimensions
val nP = 20 // partitions
val iterations = 100000 // total iterations. This could be less more
val RDD = sc.parallelize(0 until N , nP).mapPartitionsWithIndex{ (index,iter) =>
val data = iter.map(i =>
new myClass( d, max,min , Sphere ) )
data
}.persist(StorageLevel.MEMORY_AND_DISK)
val itr = 1
val res = RDD.mapPartitionsWithIndex {
(index, Iterator) => {
var li = Iterator.toArray
li = li.sortWith(_.fitness < _.fitness)
val res = loop(li, iterations, itr)
val bests = res.sortWith(_.fitness < _.fitness).take(5).map(x => (x, index))
bests.toIterator
}
}
@tailrec
private def loop(arr: Array[myClass], iteration: Int, itret: Int): Array[myClass] = {
iteration match {
case 0 => arr
case _ => {
arr.map { j =>
val l_Best = arr.minBy(_.fitness).pos // sort based on fitness
val l_Sum: Double = arr.map(_.LR).reduce( _ + _ ) // Calculate sum of Rate
val res = j.move(l_Best, l_Sum, arr.size, Sphere)
if (math.random < j.LR && res._2 < j.fitness) {
j.pos = res._1
j.fitness = res._2
j.LR = j.LR * 0.95
j.PR = 0.95 * (1 - math.pow(math.E, (-0.95 * itret)))
}
}
loop(arr, iteration - 1, itret + 1)
}
}
}
I tested this code on 4 node cluster For N = 100 and d = 100. It takes less than one minute to complete 10,000 iterations, but for N = 10,000 and d = 10,000, it took 19 hours to complete just 500 iterations.
According to my observations, functions that do element-wise operations inside myClass class taking a long time. How can I increase its speed? Please give some suggestion. I want to execute it with the following configurations.
N = 10,000 ,d = 10,000 and iterations = 1000000000 (1 billion )
Upvotes: 0
Views: 147
Reputation: 1380
You can greatly simplify your element-wise operations, e.g.:
def ElementsWiseSubtract ( arr :Array[Double] , arr1 :Array[Double] ) : Array [Double] = arr.zip(arr1).map(x => x._1 - x._2)
Upvotes: 3