YaleBD
YaleBD

Reputation: 163

Spark ML insert/fit custom OneHotEncoder into a Pipeline

Say I have a few features/columns in a dataframe on which I apply the regular OneHotEncoder, and one (let, n-th) column on which I need to apply my custom OneHotEncoder. Then I need to use VectorAssembler to assemble those features, and put into a Pipeline, finally fitting my trainData and getting predictions from my testData, such as:

val sIndexer1 = new StringIndexer().setInputCol("my_feature1").setOutputCol("indexed_feature1")
// ... let, n-1 such sIndexers for n-1 features
val featureEncoder = new OneHotEncoderEstimator().setInputCols(Array(sIndexer1.getOutputCol), ...).
      setOutputCols(Array("encoded_feature1", ... ))

// **need to insert output from my custom OneHotEncoder function (please see below)**
// (which takes the n-th feature as input) in a way that matches the VectorAssembler below

val vectorAssembler = new VectorAssembler().setInputCols(featureEncoder.getOutputCols + ???).
      setOutputCol("assembled_features")

...

val pipeline = new Pipeline().setStages(Array(sIndexer1, ...,featureEncoder, vectorAssembler, myClassifier))
val model = pipeline.fit(trainData)
val predictions = model.transform(testData)

How can I modify the building of the vectorAssembler so that it can ingest the output from the custom OneHotEncoder? The problem is my desired oheEncodingTopN() cannot/should not refer to the "actual" dataframe, since it would be a part of the pipeline (to apply on trainData/testData).

Note:

I tested that the custom OneHotEncoder (see link) works just as expected separately on e.g. trainData. Basically, oheEncodingTopN applies OneHotEncoding on the input column, but for the top N frequent values only (e.g. N = 50), and put all the rest infrequent values in a dummy column (say, "default"), e.g.:

val oheEncoded = oheEncodingTopN(df, "my_featureN", 50)

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit, when}
import org.apache.spark.sql.Column


def flip(col: Column): Column = when(col === 1, lit(0)).otherwise(lit(1))

def oheEncodingTopN(df: DataFrame, colName: String, n: Int): DataFrame = {
  df.createOrReplaceTempView("data")
  val topNDF = spark.sql(s"select $colName, count(*) as count from data group by $colName order by count desc limit $n")

  val pivotTopNDF = topNDF.
    groupBy(colName).
    pivot(colName).
    count().
    withColumn("default", lit(1))

  val joinedTopNDF = df.join(pivotTopNDF, Seq(colName), "left").drop(colName)

  val oheEncodedDF = joinedTopNDF.
    na.fill(0, joinedTopNDF.columns).
    withColumn("default", flip(col("default")))

   oheEncodedDF

}

Upvotes: 2

Views: 272

Answers (1)

Simon Delecourt
Simon Delecourt

Reputation: 1599

I think the cleanest way would be to create your own class that extends spark ML Transformer so that you can play with as you would do with any other transformer (like OneHotEncoder). Your class would look like this :

import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.Param
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Column}

class OHEncodingTopN(n :Int, override val uid: String) extends Transformer {
  final val inputCol= new Param[String](this, "inputCol", "The input column")
  final val outputCol = new Param[String](this, "outputCol", "The output column")

 ; def setInputCol(value: String): this.type = set(inputCol, value)

  def setOutputCol(value: String): this.type = set(outputCol, value)

  def this(n :Int) = this(n, Identifiable.randomUID("OHEncodingTopN"))

  def copy(extra: ParamMap): OHEncodingTopN = {
    defaultCopy(extra)
  }

  override def transformSchema(schema: StructType): StructType = {
    // Check that the input type is what you want if needed 
    //     val idx = schema.fieldIndex($(inputCol))
    //     val field = schema.fields(idx)
    //     if (field.dataType != StringType) {
    //       throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
    //     }
    // Add the return field
    schema.add(StructField($(outputCol), IntegerType, false))
  }
  def flip(col: Column): Column = when(col === 1, lit(0)).otherwise(lit(1))

  def transform(df: Dataset[_]): DataFrame = {
      df.createOrReplaceTempView("data")
      val colName = $(inputCol)
      val topNDF = df.sparkSession.sql(s"select $colName, count(*) as count from data group by $colName order by count desc limit $n")

      val pivotTopNDF = topNDF.
        groupBy(colName).
        pivot(colName).
        count().
        withColumn("default", lit(1))

      val joinedTopNDF = df.join(pivotTopNDF, Seq(colName), "left").drop(colName)

      val oheEncodedDF = joinedTopNDF.
        na.fill(0, joinedTopNDF.columns).
        withColumn("default", flip(col("default")))

       oheEncodedDF
  }
}

Now on a OHEncodingTopN object you should be able to call .getOuputCol to perform what you want. Good luck.

EDIT: your method that I just copy pasted in the transform method should be slightly modified in order to output a column of type Vector having the name given in the setOutputCol.

Upvotes: 2

Related Questions