Sam Comber
Sam Comber

Reputation: 1293

How to calculate element-wise multiplication between two ArrayType columns with pyspark

I'm trying to calculate the element-wise product between two ArrayType columns in my Pyspark dataframe. I've tried using the below to achieve this, but can't seem to get a correct result...

from pyspark.sql import functions as F

data.withColumn("array_product", F.expr("transform(CASUAL_TOPS_SIMILARITY_SCORE, (x, PER_UNA_SIMILARITY_SCORE) -> x * PER_UNA_SIMILARITY_SCORE)"))

Does anyone have any tips as to how I can achieve the correct result here? I have attached a test row from this DataFrame below... where I need to multiply column CASUAL_TOPS_SIMILARITY_SCORE with PER_UNA_SIMILARITY_SCORE

import json 
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test").getOrCreate()

js = '{"PER_UNA_SIMILARITY_SCORE":{"category_list":[0.9736891648,0.9242207186,0.9717901106,0.9763716155,0.9440944231,0.9708032326,0.9599383329,0.9705343027,0.804267581,0.9597317177,0.9316773281,0.8076725314,0.9555369889,0.9753550725,0.9811865431,1.0,0.8231541809,0.9738989392,0.9780283991,0.9644088011,0.9798529418,0.9347357116,0.9727502648,0.9778486916,0.8621780792,0.9735844196,0.9582644436,0.9579092722,0.8890027888,0.9394986243,0.9563411605,0.9811867597,0.9738380108,0.9577698381,0.7912932623,0.9778158279]},"CASUAL_TOPS_SIMILARITY_SCORE":{"category_list":[0.7924168764,0.7511316884,0.7925161719,0.8007234107,0.7953468064,0.7882556409,0.7778519374,0.7881058994,1.0,0.7785517364,0.7733458123,0.7426205538,0.7905195275,0.7925983778,0.7983386701,0.804267581,0.6749185095,0.7924821952,0.8016348085,0.7895650508,0.7985721918,0.772656847,0.7897495222,0.7948759958,0.6996340275,0.8024327668,0.7784598142,0.7942396044,0.7159431296,0.7850145414,0.7768001023,0.7983372946,0.7971616495,0.7927845035,0.6462844274,0.799555357]}}'

a_json = json.loads(js)

data = spark.createDataFrame(pd.DataFrame.from_dict(a_json))

Upvotes: 1

Views: 1574

Answers (1)

Steven
Steven

Reputation: 15258

Simplest (but not necessarily most efficient) is using a UDF:

from pyspark.sql import functions as F, types as T


@F.udf(T.ArrayType(T.FloatType()))
def product(A, B):
    return [x * y for x, y in zip(A, B)]


data.withColumn(
    "array_product",
    product(F.col("CASUAL_TOPS_SIMILARITY_SCORE"), F.col("PER_UNA_SIMILARITY_SCORE")),
).show()

+----------------------------+------------------------+--------------------+    
|CASUAL_TOPS_SIMILARITY_SCORE|PER_UNA_SIMILARITY_SCORE|       array_product|
+----------------------------+------------------------+--------------------+
|        [0.7924168764, 0....|    [0.9736891648, 0....|[0.7715677, 0.694...|
+----------------------------+------------------------+--------------------+

EDIT: from 2.4, you can also use SQL built-in functions:

data.withColumn(
    "array_product",
    F.arrays_zip(
        F.col("CASUAL_TOPS_SIMILARITY_SCORE"), F.col("PER_UNA_SIMILARITY_SCORE")
    ),
).withColumn(
    "array_product",
    F.expr(
        "transform(array_product, x -> x['CASUAL_TOPS_SIMILARITY_SCORE']*x['PER_UNA_SIMILARITY_SCORE'])"
    ),
).show()

+----------------------------+------------------------+--------------------+    
|CASUAL_TOPS_SIMILARITY_SCORE|PER_UNA_SIMILARITY_SCORE|       array_product|
+----------------------------+------------------------+--------------------+
|        [0.7924168764, 0....|    [0.9736891648, 0....|[0.77156772655534...|
+----------------------------+------------------------+--------------------+

NB: the two methods returns different results in terms of floating point.

Upvotes: 7

Related Questions