Reputation: 451
I am using Spark and I would like to train a machine learning model.
Because of bad results, I would like to display the error made by the model at each epoch of the training (on train and test dataset).
I will then use this information to determined if my model is underfitting or overfitting the data.
Question: How can I draw the learning curve of a model with spark ?
In the following example, I have implement my own evaluator and override the evaluate method to print the metrics I was needed, but only two values have been display (maxIter = 1000).
MinimalRunnableCode.scala:
import org.apache.spark.SparkConf
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.sql.SparkSession
object Min extends App {
// Open spark session.
val conf = new SparkConf()
.setMaster("local")
.set("spark.network.timeout", "800")
val ss = SparkSession.builder
.config(conf)
.getOrCreate
// Load data.
val data = ss.createDataFrame(ss.sparkContext.parallelize(
List(
(Vectors.dense(1, 2), 1),
(Vectors.dense(1, 3), 2),
(Vectors.dense(1, 2), 1),
(Vectors.dense(1, 3), 2),
(Vectors.dense(1, 2), 1),
(Vectors.dense(1, 3), 2),
(Vectors.dense(1, 2), 1),
(Vectors.dense(1, 3), 2),
(Vectors.dense(1, 2), 1),
(Vectors.dense(1, 3), 2),
(Vectors.dense(1, 4), 3)
)
))
.withColumnRenamed("_1", "features")
.withColumnRenamed("_2", "label")
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 42)
// Create model of linear regression.
val lr = new LinearRegression().setMaxIter(1000)
// Create parameters grid that will be used to train different version of the linear model.
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.001))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.5))
.build()
// Create trainer using validation split to evaluate which set of parameters performs the best.
val trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new CustomRegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8) // 80% of the data will be used for training and the remaining 20% for validation.
// Run train validation split, and choose the best set of parameters.
var model = trainValidationSplit.fit(training)
// Close spark session.
ss.stop()
}
CustomRegressionEvaluator.scala:
import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator}
import org.apache.spark.ml.param.{Param, ParamMap, Params}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
final class CustomRegressionEvaluator (override val uid: String) extends Evaluator with HasPredictionCol with HasLabelCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("regEval"))
def checkNumericType(
schema: StructType,
colName: String,
msg: String = ""): Unit = {
val actualDataType = schema(colName).dataType
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(actualDataType.isInstanceOf[NumericType], s"Column $colName must be of type " +
s"NumericType but was actually of type $actualDataType.$message")
}
def checkColumnTypes(
schema: StructType,
colName: String,
dataTypes: Seq[DataType],
msg: String = ""): Unit = {
val actualDataType = schema(colName).dataType
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(dataTypes.exists(actualDataType.equals),
s"Column $colName must be of type equal to one of the following types: " +
s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.$message")
}
var i = 0 // count the number of time the evaluate method is called
override def evaluate(dataset: Dataset[_]): Double = {
val schema = dataset.schema
checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType))
checkNumericType(schema, $(labelCol))
val predictionAndLabels = dataset
.select(col($(predictionCol)).cast(DoubleType), col($(labelCol)).cast(DoubleType))
.rdd
.map { case Row(prediction: Double, label: Double) => (prediction, label) }
val metrics = new RegressionMetrics(predictionAndLabels)
val metric = "mae" match {
case "rmse" => metrics.rootMeanSquaredError
case "mse" => metrics.meanSquaredError
case "r2" => metrics.r2
case "mae" => metrics.meanAbsoluteError
}
println(s"$i $metric") // Print the metrics
i = i + 1 // Update counter
metric
}
override def copy(extra: ParamMap): RegressionEvaluator = defaultCopy(extra)
}
object RegressionEvaluator extends DefaultParamsReadable[RegressionEvaluator] {
override def load(path: String): RegressionEvaluator = super.load(path)
}
private[ml] trait HasPredictionCol extends Params {
/**
* Param for prediction column name.
* @group param
*/
final val predictionCol: Param[String] = new Param[String](this, "predictionCol", "prediction column name")
setDefault(predictionCol, "prediction")
/** @group getParam */
final def getPredictionCol: String = $(predictionCol)
}
private[ml] trait HasLabelCol extends Params {
/**
* Param for label column name.
* @group param
*/
final val labelCol: Param[String] = new Param[String](this, "labelCol", "label column name")
setDefault(labelCol, "label")
/** @group getParam */
final def getLabelCol: String = $(labelCol)
}
Upvotes: 2
Views: 1983
Reputation: 441
In case you just came from Databricks I leave here my code to use in a Databricks Notebook and using PySpark. It is just an adaptation from eliasah's code.
from pyspark.ml.regression import (
LinearRegression,
LinearRegressionModel,
LinearRegressionTrainingSummary,
)
from pyspark.ml.regression import FMRegressor, FMRegressionModel
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.mllib.util import LinearDataGenerator, MLUtils
from pyspark.ml.param import Params
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark import pandas
# Generate data
dfdata = LinearDataGenerator.generateLinearRDD(
spark.sparkContext, nexamples=10_000, nfeatures=3, eps=0.25
).toDF()
dfdataML = MLUtils.convertVectorColumnsToML(dfdata, "features")
# Create model of linear regression.
linear_regressor = LinearRegression(maxIter=1_000)
# The following line will create two sets of parameters
parameter_grid = (
ParamGridBuilder()
.baseOn({linear_regressor.labelCol: "label"})
.baseOn([linear_regressor.predictionCol, "predicted"])
.addGrid(linear_regressor.regParam, [0.001])
.addGrid(linear_regressor.fitIntercept, [True, False])
.addGrid(linear_regressor.elasticNetParam, [0.5, 0.1])
.build()
)
regression_evaluator = RegressionEvaluator(
labelCol="label", predictionCol="predicted", metricName="rmse"
)
# Create trainer using validation split to evaluate which set of parameters performs the best.
# I'm using the regular RegressionEvaluator here
train_validation_split = TrainValidationSplit(
estimator=linear_regressor,
evaluator=regression_evaluator,
estimatorParamMaps=parameter_grid,
trainRatio=0.8, # 80% training 20% validation
collectSubModels=True, # To get all trained models during gridsearch
seed=42,
parallelism=1,
)
# Run grid search training
trained_grid_search = train_validation_split.fit(dfdataML)
parameter_grid = trained_grid_search.getEstimatorParamMaps()
models = trained_grid_search.subModels
results = list()
for params, model in zip(parameter_grid, models):
pdict = {p.name:v for p, v in params.items() if p.name not in ['labelCol','predictionCol']}
group = str(pdict)
pdict.update({'group':group})
for iteration, cost in enumerate(model.summary.objectiveHistory):
d = pdict.copy()
d.update({'it':iteration, 'cost':cost})
results.append(d)
dfresults = pandas.DataFrame(results).to_spark()
Upvotes: 0
Reputation: 40370
Here is a possible solution for the specific case of LinearRegression
and any other algorithm that support objective history (in this case, And LinearRegressionTrainingSummary
does the job).
Let's first create a minimal verifiable and complete example :
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils}
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder().getOrCreate()
import org.apache.spark.ml.evaluation.RegressionEvaluator
import spark.implicits._
val data = {
val tmp = LinearDataGenerator.generateLinearRDD(
spark.sparkContext,
nexamples = 10000,
nfeatures = 4,
eps = 0.05
).toDF
MLUtils.convertVectorColumnsToML(tmp, "features")
}
As you've noticed, when you want to generate data for testing purposes for spark-mllib
or spark-ml
, it's advised to use data generators.
Now, let's train a linear regressor :
// Create model of linear regression.
val lr = new LinearRegression().setMaxIter(1000)
// The following line will create two sets of parameters
val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Array(0.001)).addGrid(lr.fitIntercept).addGrid(lr.elasticNetParam, Array(0.5)).build()
// Create trainer using validation split to evaluate which set of parameters performs the best.
// I'm using the regular RegressionEvaluator here
val trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8) // 80% of the data will be used for training and the remaining 20% for validation.
// To retrieve subModels, make sure to set collectSubModels to true before fitting.
trainValidationSplit.setCollectSubModels(true)
// Run train validation split, and choose the best set of parameters.
var model = trainValidationSplit.fit(data)
Now since our model is trained, all we need is to get the objective history.
The following part needs a bit of gymnastics between the model and sub-models object parameters.
In case you have a Pipeline
or so, this code needs to be modified, so use it carefully. It's just an example :
val objectiveHist = spark.sparkContext.parallelize(
model.subModels.zip(model.getEstimatorParamMaps).map {
case (m: LinearRegressionModel, pm: ParamMap) =>
val history: Array[Double] = m.summary.objectiveHistory
val idx: Seq[Int] = 1 until history.length
// regParam, elasticNetParam, fitIntercept
val parameters = pm.toSeq.map(pair => (pair.param.name, pair.value.toString)) match {
case Seq(x, y, z) => (x._2, y._2, z._2)
}
(parameters._1, parameters._2, parameters._3, idx.zip(history).toMap)
}).toDF("regParam", "elasticNetParam", "fitIntercept", "objectiveHistory")
We can now examine those metrics :
objectiveHist.show(false)
// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+
// |regParam|elasticNetParam|fitIntercept|objectiveHistory |
// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+
// |0.001 |0.5 |true |[1 -> 0.4999999999999999, 2 -> 0.4038796441909531, 3 -> 0.02659222058006269, 4 -> 0.026592220340980147]|
// |0.001 |0.5 |false |[1 -> 0.5000637621421942, 2 -> 0.4039303922115196, 3 -> 0.026592220673025396, 4 -> 0.02659222039347222]|
// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+
You can notice that the training process actually stops after 4 iterations.
If you want just the number of iterations, you can do the following instead :
val objectiveHist2 = spark.sparkContext.parallelize(
model.subModels.zip(model.getEstimatorParamMaps).map {
case (m: LinearRegressionModel, pm: ParamMap) =>
val history: Array[Double] = m.summary.objectiveHistory
// regParam, elasticNetParam, fitIntercept
val parameters = pm.toSeq.map(pair => (pair.param.name, pair.value.toString)) match {
case Seq(x, y, z) => (x._2, y._2, z._2)
}
(parameters._1, parameters._2, parameters._3, history.size)
}).toDF("regParam", "elasticNetParam", "fitIntercept", "iterations")
I've changed the number of features in the generator (nfeatures = 100
) for the sake of demonstrations :
objectiveHist2.show
// +--------+---------------+------------+----------+
// |regParam|elasticNetParam|fitIntercept|iterations|
// +--------+---------------+------------+----------+
// | 0.001| 0.5| true| 11|
// | 0.001| 0.5| false| 11|
// +--------+---------------+------------+----------+
Upvotes: 9