Reputation: 41
I'm running logistic regression with SGD on a large libsvm file. The file is about 10 GB in size with 40 million training examples.
When I run my scala code with spark-submit, I notice that spark spends a lot of time logging this:
18/02/07 04:44:50 INFO HadoopRDD: Input split: file:/ebs2/preprocess/xaa:234881024+33554432
18/02/07 04:44:51 INFO Executor: Finished task 6.0 in stage 1.0 (TID 7). 875 bytes result sent to driver
18/02/07 04:44:51 INFO TaskSetManager: Starting task 8.0 in stage 1.0 (TID 9, localhost, executor driver, partition 8, PROCESS_LOCAL, 7872 bytes)
18/02/07 04:44:51 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 7) in 1025 ms on localhost (executor driver) (7/307)
Why is Spark doing so many 'HadoopRDD: Input splits'? What's the purpose of that, and how do I go about speeding up or getting rid of this process?
Here is the code:
import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.optimization.L1Updater
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import scala.compat.Platform._
object test {
def main(args: Array[String]) {
val nnodes = 1
val epochs = 3
val conf = new SparkConf().setAppName("Test Name")
val sc = new SparkContext(conf)
val t0=currentTime
val train = MLUtils.loadLibSVMFile(sc, "/ebs2/preprocess/xaa", 262165, 4)
val test = MLUtils.loadLibSVMFile(sc, "/ebs2/preprocess/xab", 262165, 4)
val t1=currentTime;
println("START")
val lrAlg = new LogisticRegressionWithSGD()
lrAlg.optimizer.setMiniBatchFraction(10.0/40000000.0)
lrAlg.optimizer.setNumIterations(12000000)
lrAlg.optimizer.setStepSize(0.01)
val model = lrAlg.run(train)
model.clearThreshold()
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("Area under ROC = " + auROC)
}
}
Upvotes: 3
Views: 1723
Reputation: 41
I fixed the speed issues by running
train = train.coalesce(1)
train.cache()
and by increasing the memory to a total of 64 gigs. Previously Spark might not have been caching properly due to not enough RAM.
Upvotes: 1