figs_and_nuts
figs_and_nuts

Reputation: 5771

How to add a new column with a constant DenseVector to a pyspark dataframe?

I want to add a new column to a pyspark dataframe that contains a constant DenseVector.

Following is my attempt but it fails:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = [(1,2),(3,4),(5,6),(7,8)]
df = spark.createDataFrame(data=data)

@udf(returnType=VectorUDT())
def add_cons_dense_col(val):
    return val

df.withColumn('ttt',add_cons_dense_col(DenseVector([1.,0.]))).show()

it fails with:

TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_3894138/803146743.py in <module>
----> 1 df.withColumn('ttt',add_cons_dense_col(DenseVector([1.,0.]))).show()

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/udf.py in wrapper(*args)
    197         @functools.wraps(self.func, assigned=assignments)
    198         def wrapper(*args):
--> 199             return self(*args)
    200 
    201         wrapper.__name__ = self._name

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/udf.py in __call__(self, *cols)
    177         judf = self._judf
    178         sc = SparkContext._active_spark_context
--> 179         return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    180 
    181     # This function is for improving the online help system in the interactive interpreter.

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/column.py in _to_seq(sc, cols, converter)
     59     """
     60     if converter:
---> 61         cols = [converter(c) for c in cols]
     62     return sc._jvm.PythonUtils.toSeq(cols)
     63 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/column.py in <listcomp>(.0)
     59     """
     60     if converter:
---> 61         cols = [converter(c) for c in cols]
     62     return sc._jvm.PythonUtils.toSeq(cols)
     63 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/column.py in _to_java_column(col)
     43         jcol = _create_column_from_name(col)
     44     else:
---> 45         raise TypeError(
     46             "Invalid argument, not a string or column: "
     47             "{0} of type {1}. "

TypeError: Invalid argument, not a string or column: [1.0,0.0] of type <class 'pyspark.ml.linalg.DenseVector'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Can you help me understand why this is failing?

Upvotes: 1

Views: 777

Answers (2)

blackbishop
blackbishop

Reputation: 32700

You need to pass an ArrayType type column when you call your UDF not DenseVector. And you also need to change the return of add_cons_dense_col function to DenseVector:

import pyspark.sql.functions as F

@F.udf(returnType=VectorUDT())
def add_cons_dense_col(val):
    return DenseVector(val)

df.withColumn('ttt', add_cons_dense_col(F.array(F.lit(1.), F.lit(1.)))).show()

#+---+---+---------+
#| _1| _2|      ttt|
#+---+---+---------+
#|  1|  2|[1.0,0.0]|
#|  3|  4|[1.0,0.0]|
#|  5|  6|[1.0,0.0]|
#|  7|  8|[1.0,0.0]|
#+---+---+---------+

To create array column from python list:

F.array(*[F.lit(x) for x in [1., 0., 3., 5.]])

Upvotes: 1

过过招
过过招

Reputation: 4234

You can try

add_cons_dense_col = F.udf(lambda: DenseVector([1., 0.]), VectorUDT())
df = df.withColumn('ttt', add_cons_dense_col())
df.show(truncate=False)

Upvotes: 2

Related Questions