renegademonkey
renegademonkey

Reputation: 467

Explode sparse features vector into separate columns

In my spark DataFrame I have a column which includes the output of a CountVectoriser transformation - it is in sparse vector format. What I am trying to do is to 'explode' this column again into a dense vector and then it's component rows (so that it can be used for scoring by an external model).

I know there are 40 features in the column, hence Following this example, I have tried:

import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector

// convert sparse vector to a dense vector, and then to array<double> 
val vecToSeq = udf((v: Vector) => v.toArray)

// Prepare a list of columns to create
val exprs = (0 until 39).map(i => $"_tmp".getItem(i).alias(s"exploded_col$i"))
testDF.select(vecToSeq($"features").alias("_tmp")).select(exprs:_*)

However, I get the weird error (see full error below):

data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;

Now it appears that maybe the CountVectoriser created a vector of type 'ml.linalg.Vector,' so I have alternatively tried importing:

import org.apache.spark.ml.linalg.{Vector, DenseVector, SparseVector}

And then I get an error Caused by:

Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row

I have also tried converting the ml vector by altering the UDF to:

val vecToSeq = udf((v: Vector) =>  org.apache.spark.mllib.linalg.Vectors.fromML(v.toDense).toArray )

And get a similar cannot be cast to org.apache.spark.sql.Row error. Can anyone tell me why this is not working? Is there an easier way to explode a sparse vector in a DataFrame into sperate columns? I've spent hours on this and cannot figure it out.

EDIT: The schema shows the feature column just as a vector:

  |-- features: vector (nullable = true)

Full error trace:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features)' due to data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;;
Project [UDF(features#325) AS _tmp#463]
. . . 
org.apache.spark.sql.cassandra.CassandraSourceRelation@47eae91d

        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
        at uk.nominet.renewals.prediction_test$.prediction_test(prediction_test.scala:292)
        at 

Upvotes: 3

Views: 3034

Answers (2)

Shaido
Shaido

Reputation: 28352

It appears to be an issue with your import statements. As you noticed, CountVectorizer will use the ml package vectors, therefore, all vector imports should also use this package. Make sure you do not have any imports using the older mllib. This include:

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.DenseVector

There are some methods only present in the mllib package, so in the case you actually need to use this type of vectors, you can rename them (since the name is the same as the ml vectors otherwise). For example:

import org.apache.spark.mllib.linalg.{Vector => mllibVector}

After fixing all imports, your code should run. Test:

val df = Seq((1L, Seq("word1", "word2", "word3")), (2L, Seq("word2", "word4"))).toDF("id", "words")
val countVec = new CountVectorizer().setInputCol("words").setOutputCol("features")
val testDF = countVec.fit(df).transform(df)

Will give a testing dataframe as follows:

+---+--------------------+--------------------+
| id|               words|            features|
+---+--------------------+--------------------+
|  1|[word1, word2, wo...|(4,[0,2,3],[1.0,1...|
|  2|      [word2, word4]| (4,[0,1],[1.0,1.0])|
+---+--------------------+--------------------+

Now to give each index it's own column:

val vecToSeq = udf((v: Vector) => v.toArray)

val exprs = (0 until 4).map(i => $"features".getItem(i).alias(s"exploded_col$i"))
val df2 = testDF.withColumn("features", vecToSeq($"features")).select(exprs:_*)

Resulting dataFfame:

+-------------+-------------+-------------+-------------+
|exploded_col0|exploded_col1|exploded_col2|exploded_col3|
+-------------+-------------+-------------+-------------+
|          1.0|          0.0|          1.0|          1.0|
|          1.0|          1.0|          0.0|          0.0|
+-------------+-------------+-------------+-------------+

Upvotes: 1

GPI
GPI

Reputation: 9328

When working such cases, I often decompose step by step to know where the issue is coming from.

First, let's setup a dataframe :

import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.linalg.Vector
val df=sc.parallelize(Seq((1L, Seq("word1", "word2")))).toDF("id", "words")
val countModel = new CountVectorizer().setInputCol("words").setOutputCol("feature").fit(df)
val testDF = countModel.transform(df)
testDF.show

+---+--------------+-------------------+
| id|         words|            feature|
+---+--------------+-------------------+
|  1|[word1, word2]|(2,[0,1],[1.0,1.0])|
+---+--------------+-------------------+

Now, what I would like is to select, say the first column of feature, that is to say, extract the first coordinate of the feature vector.

That could be written : v(0). Now I want my dataframe to have a column that holds v(0) where v is the feature column's content. I may use a User Defined Function for that :

val firstColumnExtractor = udf((v: Vector) => v(0))

And I try to add this column to my testDF

testDF.withColumn("feature_0", firstColumnExtractor($"feature")).show
+---+--------------+-------------------+---------+                              
| id|         words|            feature|feature_0|
+---+--------------+-------------------+---------+
|  1|[word1, word2]|(2,[0,1],[1.0,1.0])|      1.0|
+---+--------------+-------------------+---------+

Note that I could just as well do it this way (this is just a matter of style, as far as I can tell):

testDF.select(firstColumnExtractor($"feature").as("feature_0")).show

This works, but that is a lot of work to repeat. Let's automate. First, I can generalize the extracting function to work at any index. Let's create a higher order function (a function that creates functions)

def columnExtractor(idx: Int) = udf((v: Vector) => v(idx))

Now, I can rewrite the previous example :

testDF.withColumn("feature_0", columnExtractor(0)($"feature")).show

OK, so now I could do it this way :

testDF.withColumn("feature_0", columnExtractor(0)($"feature"))
      .withColumn("feature_1", columnExtractor(1)($"feature"))

That works for 1, but what about 39 dimensions ? Well, let's automate some more. The above really is a foldLeft operation on each dimension :

(0 to 39).foldLeft(testDF)((df, idx) => df.withColumn("feature_"+idx, columnExtractor(idx)($"feature")))

Which is just another way of writing your function with multiple selects

val featureCols = (0 to 1).map(idx => columnExtractor(idx)($"feature").as("feature_"+idx))
testDF.select((col("*") +: featureCols):_*).show
+---+--------------+-------------------+---------+---------+
| id|         words|            feature|feature_0|feature_1|
+---+--------------+-------------------+---------+---------+
|  1|[word1, word2]|(2,[0,1],[1.0,1.0])|      1.0|      1.0|
+---+--------------+-------------------+---------+---------+

Now, for performance reasons, you might want to convert your base Vector to an array of coordinates (or a DenseVector). Feel free to do that. I feel like a DenseVector or an Array will probably be very close performance-wise, so I would write it this way :

// A function to densify the feature vector
val toDense = udf((v:Vector) => v.toDense)
// Replase testDF's feature column with its dense equivalent
val denseDF = testDF.withColumn("feature", toDense($"feature"))
// Work on denseDF as we did on testDF 
denseDF.select((col("*") +: featureCols):_*).show

Upvotes: 4

Related Questions