Theophile Champion

Spark: Draw learning curve of a model with spark

I am using Spark and I would like to train a machine learning model.

Because of bad results, I would like to display the error made by the model at each epoch of the training (on train and test dataset).

I will then use this information to determined if my model is underfitting or overfitting the data.

Question: How can I draw the learning curve of a model with spark ?

In the following example, I have implement my own evaluator and override the evaluate method to print the metrics I was needed, but only two values have been display (maxIter = 1000).


import org.apache.spark.SparkConf
import{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.sql.SparkSession

object Min extends App {

  // Open spark session.
  val conf = new SparkConf()
    .set("", "800")

  val ss = SparkSession.builder

  // Load data.
  val data = ss.createDataFrame(ss.sparkContext.parallelize(
        (Vectors.dense(1, 2), 1),
        (Vectors.dense(1, 3), 2),
        (Vectors.dense(1, 2), 1),
        (Vectors.dense(1, 3), 2),
        (Vectors.dense(1, 2), 1),
        (Vectors.dense(1, 3), 2),
        (Vectors.dense(1, 2), 1),
        (Vectors.dense(1, 3), 2),
        (Vectors.dense(1, 2), 1),
        (Vectors.dense(1, 3), 2),
        (Vectors.dense(1, 4), 3)
    .withColumnRenamed("_1", "features")
    .withColumnRenamed("_2", "label")

  val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 42)

  // Create model of linear regression.
  val lr = new LinearRegression().setMaxIter(1000)

  // Create parameters grid that will be used to train different version of the linear model.
  val paramGrid = new ParamGridBuilder()
    .addGrid(lr.regParam, Array(0.001))
    .addGrid(lr.elasticNetParam, Array(0.5))

  // Create trainer using validation split to evaluate which set of parameters performs the best.
  val trainValidationSplit = new TrainValidationSplit()
    .setEvaluator(new CustomRegressionEvaluator)
    .setTrainRatio(0.8) // 80% of the data will be used for training and the remaining 20% for validation.

  // Run train validation split, and choose the best set of parameters.
  var model =

  // Close spark session.


import{Evaluator, RegressionEvaluator}
import{Param, ParamMap, Params}
import{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

final class CustomRegressionEvaluator (override val uid: String) extends Evaluator with HasPredictionCol with HasLabelCol with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("regEval"))

  def checkNumericType(
                        schema: StructType,
                        colName: String,
                        msg: String = ""): Unit = {
    val actualDataType = schema(colName).dataType
    val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
    require(actualDataType.isInstanceOf[NumericType], s"Column $colName must be of type " +
      s"NumericType but was actually of type $actualDataType.$message")

  def checkColumnTypes(
                        schema: StructType,
                        colName: String,
                        dataTypes: Seq[DataType],
                        msg: String = ""): Unit = {
    val actualDataType = schema(colName).dataType
    val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
      s"Column $colName must be of type equal to one of the following types: " +
        s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.$message")

  var i = 0 // count the number of time the evaluate method is called
  override def evaluate(dataset: Dataset[_]): Double = {
    val schema = dataset.schema
    checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType))
    checkNumericType(schema, $(labelCol))

    val predictionAndLabels = dataset
      .select(col($(predictionCol)).cast(DoubleType), col($(labelCol)).cast(DoubleType))
      .map { case Row(prediction: Double, label: Double) => (prediction, label) }
    val metrics = new RegressionMetrics(predictionAndLabels)
    val metric = "mae" match {
      case "rmse" => metrics.rootMeanSquaredError
      case "mse" => metrics.meanSquaredError
      case "r2" => metrics.r2
      case "mae" => metrics.meanAbsoluteError
    println(s"$i $metric") // Print the metrics
    i = i + 1 // Update counter

  override def copy(extra: ParamMap): RegressionEvaluator = defaultCopy(extra)

object RegressionEvaluator extends DefaultParamsReadable[RegressionEvaluator] {

  override def load(path: String): RegressionEvaluator = super.load(path)

private[ml] trait HasPredictionCol extends Params {

    * Param for prediction column name.
    * @group param
  final val predictionCol: Param[String] = new Param[String](this, "predictionCol", "prediction column name")

  setDefault(predictionCol, "prediction")

  /** @group getParam */
  final def getPredictionCol: String = $(predictionCol)

private[ml] trait HasLabelCol extends Params {

    * Param for label column name.
    * @group param
  final val labelCol: Param[String] = new Param[String](this, "labelCol", "label column name")

  setDefault(labelCol, "label")

  /** @group getParam */
  final def getLabelCol: String = $(labelCol)

In case you just came from Databricks I leave here my code to use in a Databricks Notebook and using PySpark. It is just an adaptation from eliasah's code.

from import (
from import FMRegressor, FMRegressionModel
from import ParamGridBuilder, TrainValidationSplit
from pyspark.mllib.util import LinearDataGenerator, MLUtils
from import Params
from import RegressionEvaluator
from pyspark import pandas

# Generate data
dfdata = LinearDataGenerator.generateLinearRDD(
    spark.sparkContext, nexamples=10_000, nfeatures=3, eps=0.25

dfdataML = MLUtils.convertVectorColumnsToML(dfdata, "features")

# Create model of linear regression.
linear_regressor = LinearRegression(maxIter=1_000)

# The following line will create two sets of parameters
parameter_grid = (
    .baseOn({linear_regressor.labelCol: "label"})
    .baseOn([linear_regressor.predictionCol, "predicted"])
    .addGrid(linear_regressor.regParam, [0.001])
    .addGrid(linear_regressor.fitIntercept, [True, False])
    .addGrid(linear_regressor.elasticNetParam, [0.5, 0.1])

regression_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="predicted", metricName="rmse"

# Create trainer using validation split to evaluate which set of parameters performs the best.
# I'm using the regular RegressionEvaluator here

train_validation_split = TrainValidationSplit(
    trainRatio=0.8,  # 80% training 20% validation
    collectSubModels=True,  # To get all trained models during gridsearch

# Run grid search training
trained_grid_search =

parameter_grid = trained_grid_search.getEstimatorParamMaps()
models = trained_grid_search.subModels

results = list()
for params, model in zip(parameter_grid, models):
    pdict = { for p, v in params.items() if not in ['labelCol','predictionCol']}
    group = str(pdict)

    for iteration, cost in enumerate(model.summary.objectiveHistory):
        d = pdict.copy()
        d.update({'it':iteration, 'cost':cost})

dfresults = pandas.DataFrame(results).to_spark()

enter image description here

Upvotes: 0


Here is a possible solution for the specific case of LinearRegression and any other algorithm that support objective history (in this case, And LinearRegressionTrainingSummary does the job).

Let's first create a minimal verifiable and complete example :

import{LinearRegression, LinearRegressionModel}
import{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils}
import org.apache.spark.sql.SparkSession

val spark: SparkSession = SparkSession.builder().getOrCreate()

import spark.implicits._

val data = {
  val tmp = LinearDataGenerator.generateLinearRDD(
    nexamples = 10000,
    nfeatures = 4,
    eps = 0.05

  MLUtils.convertVectorColumnsToML(tmp, "features")

As you've noticed, when you want to generate data for testing purposes for spark-mllib or spark-ml, it's advised to use data generators.

Now, let's train a linear regressor :

// Create model of linear regression.
val lr = new LinearRegression().setMaxIter(1000)

// The following line will create two sets of parameters
val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Array(0.001)).addGrid(lr.fitIntercept).addGrid(lr.elasticNetParam, Array(0.5)).build()

// Create trainer using validation split to evaluate which set of parameters performs the best.
// I'm using the regular RegressionEvaluator here
val trainValidationSplit = new TrainValidationSplit()
  .setEvaluator(new RegressionEvaluator)
  .setTrainRatio(0.8) // 80% of the data will be used for training and the remaining 20% for validation.

// To retrieve subModels, make sure to set collectSubModels to true before fitting.
// Run train validation split, and choose the best set of parameters.
var model =

Now since our model is trained, all we need is to get the objective history.

The following part needs a bit of gymnastics between the model and sub-models object parameters.

In case you have a Pipeline or so, this code needs to be modified, so use it carefully. It's just an example :

val objectiveHist = spark.sparkContext.parallelize( {
    case (m: LinearRegressionModel, pm: ParamMap) =>
      val history: Array[Double] = m.summary.objectiveHistory
      val idx: Seq[Int] = 1 until history.length
      // regParam, elasticNetParam, fitIntercept
      val parameters = => (, pair.value.toString)) match {
        case Seq(x, y, z) => (x._2, y._2, z._2)
      (parameters._1, parameters._2, parameters._3,
  }).toDF("regParam", "elasticNetParam", "fitIntercept", "objectiveHistory")

We can now examine those metrics :
// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+
// |regParam|elasticNetParam|fitIntercept|objectiveHistory                                                                                       |
// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+
// |0.001   |0.5            |true        |[1 -> 0.4999999999999999, 2 -> 0.4038796441909531, 3 -> 0.02659222058006269, 4 -> 0.026592220340980147]|
// |0.001   |0.5            |false       |[1 -> 0.5000637621421942, 2 -> 0.4039303922115196, 3 -> 0.026592220673025396, 4 -> 0.02659222039347222]|
// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+

You can notice that the training process actually stops after 4 iterations.

If you want just the number of iterations, you can do the following instead :

val objectiveHist2 = spark.sparkContext.parallelize( {
    case (m: LinearRegressionModel, pm: ParamMap) =>
      val history: Array[Double] = m.summary.objectiveHistory
      // regParam, elasticNetParam, fitIntercept
      val parameters = => (, pair.value.toString)) match {
        case Seq(x, y, z) => (x._2, y._2, z._2)
      (parameters._1, parameters._2, parameters._3, history.size)
  }).toDF("regParam", "elasticNetParam", "fitIntercept", "iterations")

I've changed the number of features in the generator (nfeatures = 100) for the sake of demonstrations :
// +--------+---------------+------------+----------+
// |regParam|elasticNetParam|fitIntercept|iterations|
// +--------+---------------+------------+----------+
// |   0.001|            0.5|        true|        11|
// |   0.001|            0.5|       false|        11|
// +--------+---------------+------------+----------+

Upvotes: 9

