andrewmzhang
andrewmzhang

Reputation: 41

Spark spends a long time on HadoopRDD: Input split

I'm running logistic regression with SGD on a large libsvm file. The file is about 10 GB in size with 40 million training examples.

When I run my scala code with spark-submit, I notice that spark spends a lot of time logging this:

18/02/07 04:44:50 INFO HadoopRDD: Input split: file:/ebs2/preprocess/xaa:234881024+33554432

18/02/07 04:44:51 INFO Executor: Finished task 6.0 in stage 1.0 (TID 7). 875 bytes result sent to driver

18/02/07 04:44:51 INFO TaskSetManager: Starting task 8.0 in stage 1.0 (TID 9, localhost, executor driver, partition 8, PROCESS_LOCAL, 7872 bytes)

18/02/07 04:44:51 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 7) in 1025 ms on localhost (executor driver) (7/307)

Why is Spark doing so many 'HadoopRDD: Input splits'? What's the purpose of that, and how do I go about speeding up or getting rid of this process?

Here is the code:

import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.MulticlassMetrics 

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.optimization.L1Updater
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import scala.compat.Platform._ 


object test {

    def main(args: Array[String]) {

        val nnodes = 1
        val epochs = 3

        val conf = new SparkConf().setAppName("Test Name")
        val sc = new SparkContext(conf)

        val t0=currentTime
        val train = MLUtils.loadLibSVMFile(sc, "/ebs2/preprocess/xaa",  262165, 4)
        val test = MLUtils.loadLibSVMFile(sc,  "/ebs2/preprocess/xab",   262165, 4)
        val t1=currentTime;
    
        println("START")
        val lrAlg = new LogisticRegressionWithSGD()
        lrAlg.optimizer.setMiniBatchFraction(10.0/40000000.0)
        lrAlg.optimizer.setNumIterations(12000000)
        lrAlg.optimizer.setStepSize(0.01)

        val model = lrAlg.run(train)        


        model.clearThreshold()
        val scoreAndLabels = test.map { point =>
              val score = model.predict(point.features)
              (score, point.label)
        }

        val metrics = new BinaryClassificationMetrics(scoreAndLabels)
        val auROC = metrics.areaUnderROC()
        println("Area under ROC = " + auROC)
     }
}

Upvotes: 3

Views: 1723

Answers (1)

andrewmzhang
andrewmzhang

Reputation: 41

I fixed the speed issues by running

train = train.coalesce(1) train.cache()

and by increasing the memory to a total of 64 gigs. Previously Spark might not have been caching properly due to not enough RAM.

Upvotes: 1

Related Questions