Thagor
Thagor

Reputation: 900

Is it possible to store a numpy array in a Spark Dataframe Column?

I have a dataframe and I apply a function to it. This function returns an numpy array the code looks like this:

create_vector_udf = udf(create_vector, ArrayType(FloatType()))
dataframe = dataframe.withColumn('vector', create_vector_udf('text'))
dmoz_spark_df.select('lang','url','vector').show(20)

Now spark seems not to be happy with this and does not accept ArrayType(FloatType()) I get the following error message: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)

I could just numpyarray.tolist() and return a list version of it, but obviously I would always have to recreate the array if I want to use it with numpy.

so is there a way to store a numpy array in a dataframe column?

Upvotes: 16

Views: 8746

Answers (3)

Nurlybek Amangeldiuly
Nurlybek Amangeldiuly

Reputation: 43

I suggest transforming the numpy arrays to dense Vectors with VectorsUDT, so the code will look like below:

from pyspark.ml.linalg import Vectors, VectorUDT

arrayUDF = udf(lambda array: Vectors.dense(array)),VectorUDT())

df = df.withColumn('pyspark_arrays', arrayUDF(F.col('vector')))

to transform back to numpy arrays, using .toArray() is possible.

Upvotes: 2

user1460675
user1460675

Reputation: 427

One way to do that is if you convert each row of the numpy array in DataFrame to list of integer.

df.col_2 = df.col_2.map(lambda x: [int(e) for e in x])

Then, convert it to Spark DataFrame directly

df_spark = spark.createDataFrame(df)
df_spark.select('col_1', explode(col('col_2')).alias('col_2')).show(14)

Upvotes: 0

pissall
pissall

Reputation: 7409

The source of the problem is that object returned from the UDF doesn't conform to the declared type. create_vector must be not only returning numpy.ndarray but also must be converting numerics to the corresponding NumPy types which are not compatible with DataFrame API.

The only option is to use something like this:

udf(lambda x: create_vector(x).tolist(), ArrayType(FloatType()))

Upvotes: 1

Related Questions