Reputation: 2109
(Using Apache Spark 1.6.0) Hi all,
I have a SparseVector
(which is basically defined by two numpy arrays namely values
and indices
.
I want to get the highest values and their indices, to do it I use:
r = df.map(lambda row: Row(**dict(row.asDict(), top=f(vec)))))
where the f function returns [ [sorted_indices], [sorted_values] ]
the following way:
def f(v):
m, i = zip(*sorted(zip(v.values, v.indices), reverse=True))
m = [ float(j) for j in m]
i = [ int(j) for j in i]
return [i, m]
At that point r
is a pyspark.rdd.PipelinedRDD
and I can check my values are ok using, for example.
r.first().top[1]
The problem comes when I try to get a DataFrame
using:
df2 = r.toDF()
Then my values are only None
, i.e.
df2.first().top[1] # i.e. the highest values of the first Vector
shows None
.
So, it really looks like the toDF()
function destroys my data. This would be quite odd if Spark cannot handle builtin float type.
Any ideas? thx
Upvotes: 0
Views: 1749
Reputation: 330163
It doesn't work because types don't match. If you take a look at the types you'll see that top
column is represented as array<array<bigint>>
while values should be array<float>
. You function should an object that can be converted to a struct
column struct<array<bigint>, array<float>>
. An obvious choice is either a tuple
or a Row
:
from pyspark.sql import Row
def f(v):
m, i = zip(*sorted(zip(v.values, v.indices), reverse=True))
m = [ float(j) for j in m]
i = [ int(j) for j in i]
return Row(indices=i, values=m)
Also if vector is already in a DataFrame
is better to use an UDF here:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *
schema = StructType([
StructField("indices", ArrayType(IntegerType())),
StructField("values", ArrayType(DoubleType()))
])
df.withColumn("top", udf(f, schema)(col("vec_column")))
Upvotes: 2