Reputation: 33
I'm calculating the size on the indices within a __SparseVector__
using Python API for Spark (PySpark).
def score_clustering(dataframe):
assembler = VectorAssembler(inputCols = dataframe.drop("documento").columns, outputCol = "variables")
data_transformed = assembler.transform(dataframe)
data_transformed_rdd = data_transformed.select("documento", "variables").orderBy(data_transformed.documento.asc()).rdd
count_variables = data_transformed_rdd.map(lambda row : [row[0], row[1].indices.size]).toDF(["id", "frequency"])
When I execute the action __.count()__
on the __count_variables__
dataframe an error shows up:
AttributeError: 'numpy.ndarray' object has no attribute 'indices'
The main part to consider is:
data_transformed_rdd.map(lambda row : [row[0], row[1].indices.size]).toDF(["id", "frequency"])
I believe this chunk has to do with the error, but I cannot understand why the exception is telling about __numpy.ndarray__
if I'm doing the calculations through mapping that __lambda expression__
whose taking as argument a __SparseVector__
(created with the __assembler__
).
Any suggestions? Does anyone maybe know what I'm doing wrong?
Upvotes: 3
Views: 1490
Reputation: 971
There are two problems here. The first one is in indices.size
call, indices
and size
are two different attributes of SparseVector class, size
is the complete vector size and indices
are the vector indices whose values are non-zero, but size
is not a indices
attribute. So, assuming that all your vectors are instances of SparseVector class:
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(0, Vectors.sparse(4, [0, 1], [11.0, 2.0])),
(1, Vectors.sparse(4, [], [])),
(3, Vectors.sparse(4, [0,1,2], [2.0, 2.0, 2.0]))],
["documento", "variables"])
df.show()
+---------+--------------------+
|documento| variables|
+---------+--------------------+
| 0|(4,[0,1],[11.0,2.0])|
| 1| (4,[],[])|
| 3|(4,[0,1,2],[2.0,2...|
+---------+--------------------+
The solution is len
function:
df = df.rdd.map(lambda x: (x[0], x[1], len(x[1].indices)))\
.toDF(["documento", "variables", "frecuencia"])
df.show()
+---------+--------------------+----------+
|documento| variables|frecuencia|
+---------+--------------------+----------+
| 0|(4,[0,1],[11.0,2.0])| 2|
| 1| (4,[],[])| 0|
| 3|(4,[0,1,2],[2.0,2...| 3|
+---------+--------------------+----------+
And here comes the second problem: VectorAssembler does not always generate SparseVectors, depending on what is more efficient, SparseVector or DenseVectors can be generated (based on the number of zeros that your original vector has). For example, suppose the next data frame:
df = spark.createDataFrame([(0, Vectors.sparse(4, [0, 1], [11.0, 2.0])),
(1, Vectors.dense([1., 1., 1., 1.])),
(3, Vectors.sparse(4, [0,1,2], [2.0, 2.0, 2.0]))],
["documento", "variables"])
df.show()
+---------+--------------------+
|documento| variables|
+---------+--------------------+
| 0|(4,[0,1],[11.0,2.0])|
| 1| [1.0,1.0,1.0,1.0]|
| 3|(4,[0,1,2],[2.0,2...|
+---------+--------------------+
The document 1 is a DenseVector and the previos solution does not work because DenseVectors has not indices
attribute, so you have to use a more general representation of vectors to work with a DataFrame which contains both sparse and dense vectors, for example numpy
:
import numpy as np
df = df.rdd.map(lambda x: (x[0],
x[1],
np.nonzero(x[1])[0].size))\
.toDF(["documento", "variables", "frecuencia"])
df.show()
+---------+--------------------+----------+
|documento| variables|frecuencia|
+---------+--------------------+----------+
| 0|(4,[0,1],[11.0,2.0])| 2|
| 1| [1.0,1.0,1.0,1.0]| 4|
| 3|(4,[0,1,2],[2.0,2...| 3|
+---------+--------------------+----------+
Upvotes: 3