Maciek Leks
Maciek Leks

Reputation: 1448

Spark - LinearRegressionWithSGD on Coursera Machine Learning by Stanford University samples

Software Version: Apache Spark v1.3

Context: I've been trying to "translate" Octave/MATLAB code to Scala on Apache Spark. More precisely, I work on ex1data1.txt and ex1data2.txt from coursera practical part ex1. I've made such translation into Julia lang (it went smoothly) and now I've been struggling with Spark...without success.

Problem: Performance of my implementation on Spark is very poor. I cannot even say it works correctly. That's why for ex1data1.txt I added polynomial feature, and I also worked with: theta0 using setIntercept(true) and with extra non-normalized column of 1.0 values(in this case I set Intercept to false). I receive only silly results. So, then I 've decided to start working with ex1data2.txt. Below you can find the code and the expected result. Of course Spark result is wrong.

Did you have similar experience? I will be grateful for your help.

The Scala code for the exd1data2.txt:

import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.SquaredL2Updater
import org.apache.spark.mllib.regression.{LinearRegressionModel, LinearRegressionWithSGD, LabeledPoint}
import org.apache.spark.{SparkContext, SparkConf}


object MLibOnEx1data2 extends App {
  val conf = new SparkConf()
  conf.set("spark.app.name", "coursera ex1data2.txt test")

  val sc = new SparkContext(conf)
  val input = sc.textFile("hdfs:///ex1data2.txt")

  val trainData = input.map { line =>
    val parts = line.split(',')
    val y = parts(2).toDouble
    val features = Vectors.dense(parts(0).toDouble, parts(1).toDouble)
    println(s"x = $features y = $y")
    LabeledPoint(y, features)
  }.cache()

  // Building the model
  val numIterations = 1500
  val alpha = 0.01

  // Scale the features
  val scaler = new StandardScaler(withMean = true, withStd = true)
    .fit(trainData.map(x => x.features))
  val scaledTrainData = trainData.map{ td =>
    val normFeatures = scaler.transform(td.features)
    println(s"normalized features = $normFeatures")
    LabeledPoint(td.label, normFeatures)
  }.cache()

  val tsize = scaledTrainData.count()
  println(s"Training set size is $tsize")


  val alg = new LinearRegressionWithSGD().setIntercept(true)
  alg.optimizer
    .setNumIterations(numIterations)
    .setStepSize(alpha)
    .setUpdater(new SquaredL2Updater)
    .setRegParam(0.0)  //regularization - off

  val model = alg.run(scaledTrainData)

  println(s"Theta is $model.weights")

  val total1 = model.predict(scaler.transform(Vectors.dense(1650, 3)))

  println(s"Estimate the price of a 1650 sq-ft, 3 br house = $total1 dollars") //it should give ~ $289314.620338

  // Evaluate model on training examples and compute training error
  val valuesAndPreds = scaledTrainData.map { point =>
    val prediction = model.predict(point.features)
    (point.label, prediction)
  }
  val MSE = ((valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()) / 2)
  println("Training Mean Squared Error = " + MSE)



  // Save and load model
  val trySaveAndLoad = util.Try(model.save(sc, "myModelPath"))
    .flatMap { _ => util.Try(LinearRegressionModel.load(sc, "myModelPath")) }
    .getOrElse(-1)

  println(s"trySaveAndLoad result is $trySaveAndLoad")
}

STDOUT result is:

Training set size is 47

Theta is (weights=[52090.291641275864,19342.034885388926], intercept=181295.93717032953).weights

Estimate the price of a 1650 sq-ft, 3 br house = 153983.5541846754 dollars

Training Mean Squared Error = 1.5876093757127676E10

trySaveAndLoad result is -1

Upvotes: 2

Views: 698

Answers (1)

zero323
zero323

Reputation: 330073

Well, after some digging I believe there is nothing here. First I saved content of the valuesAndPreds to text file:

valuesAndPreds.map{
   case {x, y} => s"$x,$y"}.repartition(1).saveAsTextFile("results.txt")'

Rest of the code is written in R.

First lets create a model using closed form solution:

# Load data
df <- read.csv('results.txt/ex1data2.txt', header=FALSE)
# Scale features
df[, 1:2] <- apply(df[, 1:2], 2, scale)
# Build linear model 
model <- lm(V3 ~ ., df)

For reference:

> summary(model)

Call:
lm(formula = V3 ~ ., data = df)

Residuals:
    Min      1Q  Median      3Q     Max 
-130582  -43636  -10829   43698  198147 

Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept)   340413       9637  35.323  < 2e-16 ***
V1            110631      11758   9.409 4.22e-12 ***
V2             -6650      11758  -0.566    0.575    
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

Residual standard error: 66070 on 44 degrees of freedom
Multiple R-squared:  0.7329,    Adjusted R-squared:  0.7208 
F-statistic: 60.38 on 2 and 44 DF,  p-value: 2.428e-13

Now prediction:

closedFormPrediction <- predict(model, df)
closedFormRMSE <- sqrt(mean((closedFormPrediction - df$V3)**2))
plot(
   closedFormPrediction, df$V3,
   ylab="Actual", xlab="Predicted",
   main=paste("Closed form, RMSE: ", round(closedFormRMSE, 3)))

enter image description here'

Now we can compare above to SGD results:

sgd <- read.csv('results.txt/part-00000', header=FALSE)
sgdRMSE <- sqrt(mean(sgd$V2 - sgd$V1)**2)

plot(
   sgd$V2, sgd$V1, ylab="Actual",
   xlab="Predicted", main=paste("SGD, RMSE: ", round(sgdRMSE, 3)))

enter image description here

Finally lets compare both:

plot(
   sgd$V2, closedFormPrediction,
   xlab="SGD", ylab="Closed form", main="SGD vs Closed form")

enter image description here

So, result are clearly not perfect but nothing seems to be completely off here.

Upvotes: 1

Related Questions