Reputation: 5763
I have a few array
type columns and DenseVector
type columns in my pyspark dataframe. I want to create new columns that are element-wise additions of these columns. Below is the code that summarises the problem:
Setup:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.functions import vector_to_array
from pyspark.ml.linalg import VectorUDT, DenseVector
from pyspark.sql.functions import udf, array, lit
spark = SparkSession.builder.getOrCreate()
data = [(1,4),(2,5),(3,6)]
a = spark.createDataFrame(data)
f = udf(lambda x: DenseVector(x), returnType=VectorUDT())
import pyspark.sql.functions as F
@F.udf(returnType=VectorUDT())
def add_cons_dense_col(val):
return DenseVector(val)
a=a.withColumn('d1', add_cons_dense_col(F.array([F.lit(1.), F.lit(1.)])))
a=a.withColumn('d2', add_cons_dense_col(F.array([F.lit(1.), F.lit(1.)])))
a=a.withColumn('l1', F.array([F.lit(1.), F.lit(1.)]))
a=a.withColumn('l2', F.array([F.lit(1.), F.lit(1.)]))
a.show()
output:
+---+---+---------+---------+----------+----------+
| _1| _2| d1| d2| l1| l2|
+---+---+---------+---------+----------+----------+
| 1| 4|[1.0,1.0]|[1.0,1.0]|[1.0, 1.0]|[1.0, 1.0]|
| 2| 5|[1.0,1.0]|[1.0,1.0]|[1.0, 1.0]|[1.0, 1.0]|
| 3| 6|[1.0,1.0]|[1.0,1.0]|[1.0, 1.0]|[1.0, 1.0]|
+---+---+---------+---------+----------+----------+
I can perform the following operations to the same effect on _1
, _2
a.withColumn('l_sum', a._1+a._2)
a.withColumn('l_sum', a['_1']+a['_2'])
a.withColumn('l_sum', col('_1') + col('_2'))
I want to be able to perform additions on d1
, d2
and l1
,l2
. But all three approaches fail. I am looking to add the arrays or DenseVectors elementwise:
for example:
a.withColumn('l_sum', a.d1+a.d2).show()
a.withColumn('l_sum', a['d1']+a['d2']).show()
a.withColumn('l_sum', col('d1') + col('d2')).show()
But I get:
output:
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in withColumn(self, colName, col)
2476 if not isinstance(col, Column):
2477 raise TypeError("col should be Column")
-> 2478 return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
2479
2480 def withColumnRenamed(self, existing, new):
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1307
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1311
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: cannot resolve '(d1 + d2)' due to data type mismatch: '(d1 + d2)' requires (numeric or interval or interval day to second or interval year to month) type, not struct<type:tinyint,size:int,indices:array<int>,values:array<double>>;
'Project [_1#0L, _2#1L, d1#5, d2#10, l1#15, l2#21, (d1#5 + d2#10) AS l_sum#365]
+- Project [_1#0L, _2#1L, d1#5, d2#10, l1#15, array(1.0, 1.0) AS l2#21]
+- Project [_1#0L, _2#1L, d1#5, d2#10, array(1.0, 1.0) AS l1#15]
+- Project [_1#0L, _2#1L, d1#5, add_cons_dense_col(array(1.0, 1.0)) AS d2#10]
+- Project [_1#0L, _2#1L, add_cons_dense_col(array(1.0, 1.0)) AS d1#5]
+- LogicalRDD [_1#0L, _2#1L], false
Can you help me create a column that is elementwise addition of array type columns or DenseVector type columns
Upvotes: 2
Views: 1517
Reputation: 1857
For elementwise sum you can use this :
a = (a
.withColumn('elementWiseSum', F.expr('transform(l1, (element, index) -> element + element_at(l2, index + 1))'))
)
a.show()
Upvotes: 0
Reputation: 3232
Spark does not all allow for native operations to be applied on Vector
using expressions. Hence, a UDF
is needed.
For element-wise summation of arrays, we can zip the arrays together using arrays_zip
and apply Higher Order Function - Transform to sum the zipped array.
@F.udf(returnType=VectorUDT())
def sum_vector(v1: VectorUDT, v2: VectorUDT) -> VectorUDT:
return v1 + v2
(a.withColumn("vector_sum", sum_vector(F.col("d1"), F.col("d2")))
.withColumn("array_sum", F.expr("transform(arrays_zip(l1, l2), x -> x.l1 + x.l2)"))
).show()
"""
+---+---+---------+---------+----------+----------+----------+----------+
| _1| _2| d1| d2| l1| l2|vector_sum| array_sum|
+---+---+---------+---------+----------+----------+----------+----------+
| 1| 4|[1.0,1.0]|[1.0,1.0]|[1.0, 1.0]|[1.0, 1.0]| [2.0,2.0]|[2.0, 2.0]|
| 2| 5|[1.0,1.0]|[1.0,1.0]|[1.0, 1.0]|[1.0, 1.0]| [2.0,2.0]|[2.0, 2.0]|
| 3| 6|[1.0,1.0]|[1.0,1.0]|[1.0, 1.0]|[1.0, 1.0]| [2.0,2.0]|[2.0, 2.0]|
+---+---+---------+---------+----------+----------+----------+----------+
"""
In Spark 3.0, vector_to_array
and array_to_vector
functions have been introduced and using these the vector summation can be done without UDF by converting vector to array. Further in Spark 3.1 zip_with
can be used to apply element wise operation on 2 arrays.
from pyspark.sql import Column
from pyspark.ml.functions import vector_to_array, array_to_vector
def array_sum_expression_builder(c1: Column, c2: Column) -> Column:
return F.zip_with(c1, c2, lambda x, y: x + y)
result = (a.withColumn("vector_sum", array_to_vector(
array_sum_expression_builder(
vector_to_array(F.col("d1")),
vector_to_array(F.col("d2")))))
.withColumn("array_sum", array_sum_expression_builder(F.col("l1"), F.col("l2")))
)
result.show()
"""
+---+---+---------+---------+----------+----------+----------+----------+
| _1| _2| d1| d2| l1| l2|vector_sum| array_sum|
+---+---+---------+---------+----------+----------+----------+----------+
| 1| 4|[1.0,1.0]|[1.0,1.0]|[1.0, 1.0]|[1.0, 1.0]| [2.0,2.0]|[2.0, 2.0]|
| 2| 5|[1.0,1.0]|[1.0,1.0]|[1.0, 1.0]|[1.0, 1.0]| [2.0,2.0]|[2.0, 2.0]|
| 3| 6|[1.0,1.0]|[1.0,1.0]|[1.0, 1.0]|[1.0, 1.0]| [2.0,2.0]|[2.0, 2.0]|
+---+---+---------+---------+----------+----------+----------+----------+
"""
result.printSchema()
"""
root
|-- _1: long (nullable = true)
|-- _2: long (nullable = true)
|-- d1: vector (nullable = true)
|-- d2: vector (nullable = true)
|-- l1: array (nullable = false)
| |-- element: double (containsNull = false)
|-- l2: array (nullable = false)
| |-- element: double (containsNull = false)
|-- vector_sum: vector (nullable = true)
|-- array_sum: array (nullable = false)
| |-- element: double (containsNull = true)
"""
Upvotes: 4