diens
diens

Reputation: 659

Spark: convert an RDD[LabeledPoint] to a Dataframe to apply MinMaxScaler, and after scaling get the normalized RDD[LabeledPoint]

I'm using RDD[LabeledPoint] in my code. But now I have to normalize data using the MinMax method.

I saw that exist in ml library the MinMaxScaler, but this works with DataFrames: org.apache.spark.ml.feature.MinMaxScaler.

Because of the full code was already written with RDDs, I think I could do the followings steps to don't change anything else:

  1. Convert the RDD[LabeledPoint] to DataFrame
  2. Apply MinMaxScaler to the DataFrame
  3. Convert the DataFrame to the RDD[LabeledPoint]

The thing is I do not how can I make it. I don't have column names (but the feature vector in the LabeledPoint has 9 dimension), and I also couldn't adapt other examples to my case. For instance, the code in: https://stackoverflow.com/a/36909553/5081366 or Scaling each column of a dataframe

I will appreciate your help!

Upvotes: 1

Views: 846

Answers (1)

diens
diens

Reputation: 659

Finally, I am able to answer my own question!

Where allData is an RDD[LabeledPoint]:

    // The following import doesn't work externally because the implicits object is defined inside the SQLContext class
    val sqlContext = SparkSession
      .builder()
      .appName("Spark In Action")
      .master("local")
      .getOrCreate()

    import sqlContext.implicits._

    // Create a DataFrame from RDD[LabeledPoint]
    val all = allData.map(e => (e.label, e.features))
    val df_all = all.toDF("labels", "features")

    // Scaler instance above with the same min(0) and max(1)
    val scaler = new MinMaxScaler()
      .setInputCol("features")
      .setOutputCol("featuresScaled")
      .setMax(1)
      .setMin(0)

    // Scaling
    var df_scaled = scaler.fit(df_all).transform(df_all)

    // Drop the unscaled column
    df_scaled = df_scaled.drop("features")

    // Convert DataFrame to RDD[LabeledPoint]
    val rdd_scaled = df_scaled.rdd.map(row => LabeledPoint(
      row.getAs[Double]("labels"),
      row.getAs[Vector]("featuresScaled")
    ))

I hope this will help someone else!

Upvotes: 1

Related Questions