Reputation: 914
I heard many people says that Spark art good at sorting and distributed computing. Currently, out team do some research on spark and scala. We are going to implement an sorting service on spark. Right now, I have setup the spark cluster, and try to run and sorting example on spark cluster, but the cost time of sorting seems to long. Here is my code.
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
import scala.util.Random
/**
* Created by on 1/1/15.
*/
object AdvancedSort {
/**
* bin/spark-submit --master spark://master:7077 --executor-memory 1024M --class com.my.sortedspark.AdvancedSort lib/sortedspark.jar 100000 3
* @param args
*/
def main(args: Array[String]) {
val sampleSize = if (args.length > 0) args(0).toInt else 100000
val slice = if (args.length > 1) args(1).toInt else 3
sort(sampleSize, slice)
}
def sort(listSize: Int, slice: Int): Unit = {
val conf = new SparkConf().setAppName(getClass.getName)
val spark = new SparkContext(conf)
val step1 = System.currentTimeMillis()
val data = genRandom(listSize)
val step2 = System.currentTimeMillis()
println(">>>>>>>>>> genRandom : " + (step2 - step1))
val distData = spark.parallelize(data, slice)
val step3 = System.currentTimeMillis()
println(">>>>>>>>>> parallelize : " + (step3 - step2))
val result = distData.sortBy(x => x, true).collect
val step4 = System.currentTimeMillis()
println(">>>>>>>>>> sortBy and collect: " + (step4 - step3))
println(">>>>>>>>>> total time : " + (step4 - step1))
printlnArray(result, 0, 10)
spark.stop()
}
/**
* generate random number
* @return
*/
def genRandom(listSize: Int): List[Int] = {
val range = 100000
var listBuffer = new ListBuffer[Int]
val random = new Random()
for (i <- 1 to listSize) listBuffer += random.nextInt(range)
listBuffer.toList
}
def printlnList(list: List[Int], start: Int, offset: Int) {
for (i <- start until start + offset) println(">>>>>>>>> list : " + i + " | " + list(i))
}
def printlnArray(list: Array[Int], start: Int, offset: Int) {
for (i <- start until start + offset) println(">>>>>>>>> list : " + i + " | " + list(i))
}
}
After deploy the above code into spark cluster, I run the following command under Master's Spark Home:
bin/spark-submit --master spark://master:7077 --executor-memory 1024M --class com.my.sortedspark.AdvancedSort lib/sortedspark.jar 100000 3
The following is cost time which I got finally.
>>>>>>>>>> genRandom : 86
>>>>>>>>>> parallelize : 53
>>>>>>>>>> sortBy and collect: 6756
This looks strange, because if I run 100000 random data of Int via scala's sorted method on my local machine, the cost time is quicker the spark's.
import scala.collection.mutable.ListBuffer
import scala.util.Random
/**
* Created by on 1/5/15.
*/
object ScalaSort {
def main(args: Array[String]) {
val list = genRandom(1000000)
val start = System.currentTimeMillis()
val result = list.sorted
val end = System.currentTimeMillis()
println(">>>>>>>>>>>>>>>>>> cost time : " + (end - start))
}
/**
* generate random number
* @return
*/
def genRandom(listSize: Int): List[Int] = {
val range = 100000
var listBuffer = new ListBuffer[Int]
val random = new Random()
for (i <- 1 to listSize) listBuffer += random.nextInt(range)
listBuffer.toList
}
}
cost time of scala's sorted method on local machine
>>>>>>>>>>>>>>>>>> cost time : 169
In my opinion, the following factors costume spark's sorting time:
data transfor between Master and Worker
sorting on Worker is quick, by merge may be slow.
Does any master of spark know why this happen?
Upvotes: 3
Views: 1101
Reputation: 3247
Spark is made for BigData. When You insert tiny numbers to it, it acts slower because distribution over all cores/cluster takes more time than it'd take to sort it normally. Try to use bigger data or instead of Spark use ParCollections in Scala :
collection.par.<any code here>
Upvotes: 1