Reputation: 13
How can I convert output of my vector assembler to a dense vector rather than sparse vector?
val featureIndexer = new VectorAssembler().setInputCols(Array("feature1","feature2","feature3")).setOutputCol("indexedFeatures")
training_set_combined = training_set_combined.na.fill(-9999)
testing_set_combined = testing_set_combined.na.fill(-9999)
// training
val assembler = new VectorAssembler().setInputCols(feature_names.toArray).setOutputCol("features")
def get_param(): mutable.HashMap[String, Any] = {
val params = new mutable.HashMap[String, Any]()
params += "eta" -> 0.1f
params += "num_round" -> 150
params += "missing" -> -999
params += "subsample" -> 1
params += "objective" -> "binary:logistic"
return params
}
val xgb = new XGBoostClassifier(get_param().toMap).setLabelCol("label").setFeaturesCol("features")
val pipeline = new Pipeline().setStages(Array(assembler, xgb))
val xgbclassifier = pipeline.fit(training_set_combined)
I am looking to convert vector assembler to dense vector
Upvotes: 1
Views: 992
Reputation: 6323
Here is the densevector transformer implementation-
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.linalg.SQLDataTypes
import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCols}
import org.apache.spark.ml.param.{ParamMap, Params}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
class DenseVectorConverter(val uid: String) extends Transformer with Params
with HasInputCols with HasOutputCols with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("denseVectorConverter"))
/** @group setParam */
def setInputCols(value: Array[String]): this.type = set(inputCols, value)
/** @group setParam */
def setOutputCols(value: Array[String]): this.type = set(outputCols, value)
def validateAndTransformSchema(schema: StructType): StructType = {
require($(inputCols).length == $(inputCols).distinct.length, s"inputCols contains" +
s" duplicates: (${$(inputCols).mkString(", ")})")
require($(outputCols).length == $(outputCols).distinct.length, s"outputCols contains" +
s" duplicates: (${$(outputCols).mkString(", ")})")
require($(inputCols).length == $(outputCols).length, s"inputCols(${$(inputCols).length})" +
s" and outputCols(${$(outputCols).length}) should have the same length")
$(inputCols).zip($(outputCols)).foldLeft(schema) { (schema, inOutCol) =>
val inputField = schema(inOutCol._1)
require(inputField.dataType == SQLDataTypes.VectorType, s"Expected dtatype of input col: ${inputField.name} as " +
s"vector but found ${inputField.dataType}")
schema.add(inOutCol._2, inputField.dataType, inputField.nullable, inputField.metadata)
}
}
def transformSchema(schema: StructType): StructType = validateAndTransformSchema(schema)
def copy(extra: ParamMap): RenameColumns = defaultCopy(extra)
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val sparseToDense =
udf((v: org.apache.spark.ml.linalg.Vector) => v.toDense)
$(inputCols).zip($(outputCols)).foldLeft(dataset.toDF()) { (df, inputColOutputCol) =>
df.withColumn(inputColOutputCol._2,
sparseToDense(col(inputColOutputCol._1)));
}
}
}
object DenseVectorConverter extends DefaultParamsReadable[DenseVectorConverter] {
override def load(path: String): DenseVectorConverter = super.load(path)
}
I tested using below testcase-
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
df.show(false)
// +---------------------+
// |features |
// +---------------------+
// |(5,[1,3],[1.0,7.0]) |
// |[2.0,0.0,3.0,4.0,5.0]|
// |[4.0,0.0,0.0,6.0,7.0]|
// +---------------------+
val denseVectorConverter = new DenseVectorConverter()
.setInputCols(Array("features"))
.setOutputCols(Array("features_dense"))
denseVectorConverter.transform(df).show(false)
// +---------------------+---------------------+
// |features |features_dense |
// +---------------------+---------------------+
// |(5,[1,3],[1.0,7.0]) |[0.0,1.0,0.0,7.0,0.0]|
// |[2.0,0.0,3.0,4.0,5.0]|[2.0,0.0,3.0,4.0,5.0]|
// |[4.0,0.0,0.0,6.0,7.0]|[4.0,0.0,0.0,6.0,7.0]|
// +---------------------+---------------------+
Now your modified code should look like as below-
val featureIndexer = new VectorAssembler().setInputCols(Array("feature1","feature2","feature3")).setOutputCol("indexedFeatures")
training_set_combined = training_set_combined.na.fill(-9999)
testing_set_combined = testing_set_combined.na.fill(-9999)
// training
val assembler = new VectorAssembler().setInputCols(feature_names.toArray).setOutputCol("features")
val denseVectorConverter = new DenseVectorConverter()
.setInputCols(Array("features"))
.setOutputCols(Array("features_dense"))
def get_param(): mutable.HashMap[String, Any] = {
val params = new mutable.HashMap[String, Any]()
params += "eta" -> 0.1f
params += "num_round" -> 150
params += "missing" -> -999
params += "subsample" -> 1
params += "objective" -> "binary:logistic"
return params
}
val xgb = new XGBoostClassifier(get_param().toMap).setLabelCol("label").setFeaturesCol("features_dense")
val pipeline = new Pipeline().setStages(Array(assembler, denseVectorConverter, xgb))
val xgbclassifier = pipeline.fit(training_set_combined)
I've modified your code, please test it once.
Upvotes: 2