Reputation: 1448
Software Version: Apache Spark v1.3
Context: I've been trying to "translate" Octave/MATLAB code to Scala on Apache Spark. More precisely, I work on ex1data1.txt and ex1data2.txt from coursera practical part ex1. I've made such translation into Julia lang (it went smoothly) and now I've been struggling with Spark...without success.
Problem: Performance of my implementation on Spark is very poor. I cannot even say it works correctly. That's why for ex1data1.txt I added polynomial feature, and I also worked with: theta0 using setIntercept(true) and with extra non-normalized column of 1.0 values(in this case I set Intercept to false). I receive only silly results. So, then I 've decided to start working with ex1data2.txt. Below you can find the code and the expected result. Of course Spark result is wrong.
Did you have similar experience? I will be grateful for your help.
The Scala code for the exd1data2.txt:
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.SquaredL2Updater
import org.apache.spark.mllib.regression.{LinearRegressionModel, LinearRegressionWithSGD, LabeledPoint}
import org.apache.spark.{SparkContext, SparkConf}
object MLibOnEx1data2 extends App {
val conf = new SparkConf()
conf.set("spark.app.name", "coursera ex1data2.txt test")
val sc = new SparkContext(conf)
val input = sc.textFile("hdfs:///ex1data2.txt")
val trainData = input.map { line =>
val parts = line.split(',')
val y = parts(2).toDouble
val features = Vectors.dense(parts(0).toDouble, parts(1).toDouble)
println(s"x = $features y = $y")
LabeledPoint(y, features)
}.cache()
// Building the model
val numIterations = 1500
val alpha = 0.01
// Scale the features
val scaler = new StandardScaler(withMean = true, withStd = true)
.fit(trainData.map(x => x.features))
val scaledTrainData = trainData.map{ td =>
val normFeatures = scaler.transform(td.features)
println(s"normalized features = $normFeatures")
LabeledPoint(td.label, normFeatures)
}.cache()
val tsize = scaledTrainData.count()
println(s"Training set size is $tsize")
val alg = new LinearRegressionWithSGD().setIntercept(true)
alg.optimizer
.setNumIterations(numIterations)
.setStepSize(alpha)
.setUpdater(new SquaredL2Updater)
.setRegParam(0.0) //regularization - off
val model = alg.run(scaledTrainData)
println(s"Theta is $model.weights")
val total1 = model.predict(scaler.transform(Vectors.dense(1650, 3)))
println(s"Estimate the price of a 1650 sq-ft, 3 br house = $total1 dollars") //it should give ~ $289314.620338
// Evaluate model on training examples and compute training error
val valuesAndPreds = scaledTrainData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val MSE = ((valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()) / 2)
println("Training Mean Squared Error = " + MSE)
// Save and load model
val trySaveAndLoad = util.Try(model.save(sc, "myModelPath"))
.flatMap { _ => util.Try(LinearRegressionModel.load(sc, "myModelPath")) }
.getOrElse(-1)
println(s"trySaveAndLoad result is $trySaveAndLoad")
}
STDOUT result is:
Training set size is 47
Theta is (weights=[52090.291641275864,19342.034885388926], intercept=181295.93717032953).weights
Estimate the price of a 1650 sq-ft, 3 br house = 153983.5541846754 dollars
Training Mean Squared Error = 1.5876093757127676E10
trySaveAndLoad result is -1
Upvotes: 2
Views: 698
Reputation: 330073
Well, after some digging I believe there is nothing here. First I saved content of the valuesAndPreds
to text file:
valuesAndPreds.map{
case {x, y} => s"$x,$y"}.repartition(1).saveAsTextFile("results.txt")'
Rest of the code is written in R.
First lets create a model using closed form solution:
# Load data
df <- read.csv('results.txt/ex1data2.txt', header=FALSE)
# Scale features
df[, 1:2] <- apply(df[, 1:2], 2, scale)
# Build linear model
model <- lm(V3 ~ ., df)
For reference:
> summary(model)
Call:
lm(formula = V3 ~ ., data = df)
Residuals:
Min 1Q Median 3Q Max
-130582 -43636 -10829 43698 198147
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) 340413 9637 35.323 < 2e-16 ***
V1 110631 11758 9.409 4.22e-12 ***
V2 -6650 11758 -0.566 0.575
---
Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1
Residual standard error: 66070 on 44 degrees of freedom
Multiple R-squared: 0.7329, Adjusted R-squared: 0.7208
F-statistic: 60.38 on 2 and 44 DF, p-value: 2.428e-13
Now prediction:
closedFormPrediction <- predict(model, df)
closedFormRMSE <- sqrt(mean((closedFormPrediction - df$V3)**2))
plot(
closedFormPrediction, df$V3,
ylab="Actual", xlab="Predicted",
main=paste("Closed form, RMSE: ", round(closedFormRMSE, 3)))
Now we can compare above to SGD results:
sgd <- read.csv('results.txt/part-00000', header=FALSE)
sgdRMSE <- sqrt(mean(sgd$V2 - sgd$V1)**2)
plot(
sgd$V2, sgd$V1, ylab="Actual",
xlab="Predicted", main=paste("SGD, RMSE: ", round(sgdRMSE, 3)))
Finally lets compare both:
plot(
sgd$V2, closedFormPrediction,
xlab="SGD", ylab="Closed form", main="SGD vs Closed form")
So, result are clearly not perfect but nothing seems to be completely off here.
Upvotes: 1