steven
steven

Reputation: 683

MinMax Normalization in scala

I have an org.apache.spark.sql.DataFrame with multiple columns. I want to scale 1 column (lat_long_dist) using MinMax Normalization or any technique to scale the data between -1 and 1 and retain the data type as org.apache.spark.sql.DataFrame

scala> val df = sqlContext.csvFile("tenop.csv")
df: org.apache.spark.sql.DataFrame = [gst_id_matched: string,
  ip_crowding: string, lat_long_dist: double, stream_name_1: string]

I found the StandardScaler option but that requires to transform the dataset before I can do the transformation.Is there a simple clean way.

Upvotes: 10

Views: 8532

Answers (3)

ShawnXuan
ShawnXuan

Reputation: 21

there is another solution. Take codes from Matt, Lyle and zero323, thanks!

import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler}

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

val assembler = new VectorAssembler().setInputCols(Array("v")).setOutputCol("vVec")
val df2= assembler.transform(df)

val scaler = new MinMaxScaler().setInputCol("vVec").setOutputCol("vScaled").setMax(1).setMin(-1)

scaler.fit(df2).transform(df2).show

result:

+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

btw: the other solutions produce error on my side

java.lang.IllegalArgumentException: requirement failed: Column vVec must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:43)
  at org.apache.spark.ml.feature.MinMaxScalerParams$class.validateAndTransformSchema(MinMaxScaler.scala:67)
  at org.apache.spark.ml.feature.MinMaxScaler.validateAndTransformSchema(MinMaxScaler.scala:93)
  at org.apache.spark.ml.feature.MinMaxScaler.transformSchema(MinMaxScaler.scala:129)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.feature.MinMaxScaler.fit(MinMaxScaler.scala:119)
  ... 50 elided

THANKS A LOT whatever!

Upvotes: 2

Lyle
Lyle

Reputation: 1398

Here's another suggestion when you are already playing with Spark.

Why don't you use MinMaxScaler in ml package?

Let's try this with the same example from zero323.

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.functions.udf

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v"))))

val vectorizeCol = udf( (v:Double) => Vectors.dense(Array(v)) )
val df2 = df.withColumn("vVec", vectorizeCol(df("v"))

val scaler = new MinMaxScaler()
    .setInputCol("vVec")
    .setOutputCol("vScaled")
    .setMax(1)
    .setMin(-1)

scaler.fit(df2).transform(df2).show
+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

Take advantage of scaling multiple columns at once.

val df = sc.parallelize(Seq(
    (1.0, -1.0, 2.0),
    (2.0, 0.0, 0.0),
    (0.0, 1.0, -1.0)
)).toDF("a", "b", "c")

import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler()
    .setInputCols(Array("a", "b", "c"))
    .setOutputCol("features")

val df2 = assembler.transform(df)

// Reusing the scaler instance above with the same min(-1) and max(1) 
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show
+---+----+----+--------------+--------------------+
|  a|   b|   c|      features|      scaledFeatures|
+---+----+----+--------------+--------------------+
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]|      [0.0,-1.0,1.0]|
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...|
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]|     [-1.0,1.0,-1.0]|
+---+----+----+--------------+--------------------+

Upvotes: 22

zero323
zero323

Reputation: 330083

I guess what you want is something like this

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{min, max, lit}

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match {
  case Row(x: Double, y: Double) => (x, y)
}

val scaledRange = lit(2) // Range of the scaled variable
val scaledMin = lit(-1)  // Min value of the scaled variable
val vNormalized = ($"v" - vMin) / (vMax - vMin) // v normalized to (0, 1) range

val vScaled = scaledRange * vNormalized + scaledMin

df.withColumn("vScaled", vScaled).show

// +---+-----+--------------------+
// |  k|    v|             vScaled|
// +---+-----+--------------------+
// |  1|  0.5| -0.3093093093093092|
// |  2| 10.2| 0.27327327327327344|
// |  3|  5.7|0.003003003003003...|
// |  4|-11.0|                -1.0|
// |  5| 22.3|                 1.0|
// +---+-----+--------------------+

Upvotes: 13

Related Questions