Reputation: 659
I'm using RDD[LabeledPoint] in my code. But now I have to normalize data using the MinMax method.
I saw that exist in ml library the MinMaxScaler, but this works with DataFrames: org.apache.spark.ml.feature.MinMaxScaler
.
Because of the full code was already written with RDDs, I think I could do the followings steps to don't change anything else:
The thing is I do not how can I make it. I don't have column names (but the feature vector in the LabeledPoint has 9 dimension), and I also couldn't adapt other examples to my case. For instance, the code in: https://stackoverflow.com/a/36909553/5081366 or Scaling each column of a dataframe
I will appreciate your help!
Upvotes: 1
Views: 846
Reputation: 659
Finally, I am able to answer my own question!
Where allData
is an RDD[LabeledPoint]
:
// The following import doesn't work externally because the implicits object is defined inside the SQLContext class
val sqlContext = SparkSession
.builder()
.appName("Spark In Action")
.master("local")
.getOrCreate()
import sqlContext.implicits._
// Create a DataFrame from RDD[LabeledPoint]
val all = allData.map(e => (e.label, e.features))
val df_all = all.toDF("labels", "features")
// Scaler instance above with the same min(0) and max(1)
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("featuresScaled")
.setMax(1)
.setMin(0)
// Scaling
var df_scaled = scaler.fit(df_all).transform(df_all)
// Drop the unscaled column
df_scaled = df_scaled.drop("features")
// Convert DataFrame to RDD[LabeledPoint]
val rdd_scaled = df_scaled.rdd.map(row => LabeledPoint(
row.getAs[Double]("labels"),
row.getAs[Vector]("featuresScaled")
))
I hope this will help someone else!
Upvotes: 1