cylim
cylim

Reputation: 542

Store DenseVector in DataFrame column in PySpark

I am trying to store a DenseVector into a DataFrame in a new column.

I tried the following code, but got an AttributeError saying 'numpy.ndarray' object has no attribute '_get_object_id'.

from pyspark.sql import functions
from pyspark.mllib.linalg import Vectors

df = spark.createDataFrame([{'name': 'Alice', 'age': 1},
                            {'name': 'Bob', 'age': 2}])

vec = Vectors.dense([1.0, 3.0, 2.9])

df.withColumn('vector', functions.lit(vec))

I'm hoping to store a vector per row for computation purpose. Any help is appreciated.

[Python 3.7.3, Spark version 2.4.3, via Jupyter All-Spark-Notebook]

EDIT

I tried to follow the answer here as suggested by Florian, but I could not adapt the udf to take in a custom pre-constructed vector.

conv = functions.udf(lambda x: DenseVector(x), VectorUDT())
# Same with
# conv = functions.udf(lambda x: x, VectorUDT())

df.withColumn('vector', conv(vec)).show()

I get this error :

TypeError: Invalid argument, not a string or column: [1.0,3.0,2.9] of type <class 'pyspark.mllib.linalg.DenseVector'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Upvotes: 2

Views: 2708

Answers (2)

Mahmoud
Mahmoud

Reputation: 11459

Another way get the same effect without using UDFs is to wrap the DenseVector in a Dataframe and apply a cartesian product operation:

import pyspark.sql.functions as F
from pyspark.ml.linalg import DenseVector

df = spark.createDataFrame([{'name': 'Alice', 'age': 1},
                            {'name': 'Bob', 'age': 2}])

df2 = spark.createDataFrame([{'vec' : DenseVector([1.0, 3.0, 2.9])}])
df.crossJoin(df2).show()
+---+-----+-------------+
|age| name|          vec|
+---+-----+-------------+
|  1|Alice|[1.0,3.0,2.9]|
|  2|  Bob|[1.0,3.0,2.9]|
+---+-----+-------------+

Upvotes: 0

Florian
Florian

Reputation: 25415

You could wrap the creation of the udf inside a function, so it returns the udf with your vector. An example is given below, hope this helps!

import pyspark.sql.functions as F
from pyspark.ml.linalg import VectorUDT, DenseVector

df = spark.createDataFrame([{'name': 'Alice', 'age': 1},
                            {'name': 'Bob', 'age': 2}])

def vector_column(x): 
    return F.udf(lambda: x, VectorUDT())()

vec = DenseVector([1.0, 3.0, 2.9])
df.withColumn("vector", vector_column(vec)).show()

Output:

+---+-----+-------------+
|age| name|       vector|
+---+-----+-------------+
|  1|Alice|[1.0,3.0,2.9]|
|  2|  Bob|[1.0,3.0,2.9]|
+---+-----+-------------+

Upvotes: 2

Related Questions