pltrdy
pltrdy

Reputation: 2109

rdd.toDF() changes float to None

(Using Apache Spark 1.6.0) Hi all,

I have a SparseVector (which is basically defined by two numpy arrays namely values and indices. I want to get the highest values and their indices, to do it I use:

r = df.map(lambda row: Row(**dict(row.asDict(), top=f(vec)))))

where the f function returns [ [sorted_indices], [sorted_values] ] the following way:

def f(v):
    m, i = zip(*sorted(zip(v.values, v.indices), reverse=True))
    m = [ float(j) for j in m]
    i = [ int(j) for j in i]
    return [i, m]

At that point r is a pyspark.rdd.PipelinedRDD and I can check my values are ok using, for example.

r.first().top[1]

The problem comes when I try to get a DataFrame using:

df2 = r.toDF()

Then my values are only None, i.e.

df2.first().top[1] # i.e. the highest values of the first Vector 

shows None.

So, it really looks like the toDF() function destroys my data. This would be quite odd if Spark cannot handle builtin float type.

Any ideas? thx

Upvotes: 0

Views: 1749

Answers (1)

zero323
zero323

Reputation: 330163

It doesn't work because types don't match. If you take a look at the types you'll see that top column is represented as array<array<bigint>> while values should be array<float>. You function should an object that can be converted to a struct column struct<array<bigint>, array<float>>. An obvious choice is either a tuple or a Row:

from pyspark.sql import Row

def f(v):
    m, i = zip(*sorted(zip(v.values, v.indices), reverse=True))
    m = [ float(j) for j in m]
    i = [ int(j) for j in i]
    return Row(indices=i, values=m)

Also if vector is already in a DataFrame is better to use an UDF here:

from pyspark.sql.functions import udf, col
from pyspark.sql.types import *

schema = StructType([
    StructField("indices", ArrayType(IntegerType())), 
    StructField("values",  ArrayType(DoubleType()))
])

df.withColumn("top", udf(f, schema)(col("vec_column")))

Upvotes: 2

Related Questions