Reputation: 2501
I have a dataframe where one of the columns has a list of items (rdd). Please note that the this column "sorted_zipped" was computed using "arrays_zip" function in PySpark (on two other columns that I have dropped since). I want to compute the mean of the items based on the second value of each item. I am just moving over from regular Python Pandas to PySpark and things are very different. I am learning as fast as I can.
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|sorted_zipped |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|13-2023 |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}] |
|14-2023 |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
|15-2023 |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}] |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
I want another column in this dataframe that will have the mean of each element in the list. for the third row of this dataframe:
[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}]
The third, mean column should look like this (sorted in descending order of the mean values) :
[{chevy, 0.9795}, {lexus, 0.96}, {vw, 0.956}, {bmw, 0.9784}, {buick, 0.978}, {nissan, 0.967}]
To begin with, I learnt that an equivalent of dictionaries is a Map in PySpark. ...thought that I could create a map out of each row of "sorted_zipped", compute the mean for each key etc and use it as an udf. Not sure if I am heading in the right direction or just plodding around. Any help is appreciated.
def get_avg_1(x):
rdd = parallelize(x)
rdd2 = rdd.flatMap(lambda x: [(k, v) for (k, v) in x.items()]).collect()
grouped_k = rdd2.groupByKey()
#print [(k, list(v)) for (k, v) in grouped_k.take(1)]
# compute avg of the values
avg_map = grouped_k.mapValues(lambda x: sum(x[1])/len(x[1])).collect()
return avg_map
As I was trying to use the above udf, I hit other problems on databricks. As databricks creates a sparkcontext by itself, I cannot pass a separate context into the worker nodes. There seems to be some sort of restriction to the usage of sc in worker nodes.
Update: I tried this..
import numpy as np
import json
schema = ArrayType(StructType([
StructField("GroupedBrands", StringType(), True),
StructField("GroupedWeights", FloatType(), True)
]))
array_mean = F.udf(lambda x: (x[0], np.mean(x[1]), schema))
mean_df = sdf.withColumn("mean_value", array_mean("sorted_zipped"))
mean_df.show()
I get the below exception...telling me that the each of row of "sorted_zipped" is of type List.
PythonException: An exception was thrown from a UDF: 'TypeError: cannot perform reduce with flexible type'
Upvotes: 0
Views: 159
Reputation: 1757
Simply you can use pure Python to define your UDF
, checkout this solution
Sample input
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType, MapType
spark = SparkSession.builder.master("local[1]") \
.appName('TestApp') \
.getOrCreate()
data = [
("13-2023", [("bmw", 0.99), ("vw", 0.98), ("chevy", 0.97), ("buick", 0.96)]),
("14-2023", [("chevy", 0.98), ("bmw", 0.98), ("bmw", 0.978), ("bmw", 0.976), ("vw", 0.975), ("bmw", 0.975), ("bmw", 0.97), ("buick", 0.967), ("vw", 0.964), ("vw", 0.96), ("nissan", 0.96), ("chevy", 0.952), ("nissan", 0.95), ("nissan", 0.95), ("lexus", 0.95), ("lexus", 0.94), ("lexus", 0.94), ("nissan", 0.935), ("buick", 0.93), ("chevy", 0.928)]),
("15-2023", [("chevy", 0.992), ("bmw", 0.987), ("nissan", 0.982), ("bmw", 0.982), ("buick", 0.978), ("lexus", 0.976), ("bmw", 0.975), ("bmw", 0.97), ("chevy", 0.967), ("vw", 0.964), ("lexus", 0.961), ("nissan", 0.96), ("vw", 0.952), ("nissan", 0.952), ("vw", 0.952), ("lexus", 0.943)])
]
schema = StructType([ \
StructField("WeekOfYear", StringType(), True), \
StructField("sorted_zipped", ArrayType( \
StructType([ \
StructField("Brand", StringType(), True), \
StructField("Weight", FloatType(), True) \
]) \
), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
df.show(truncate=False)
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|sorted_zipped |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|13-2023 |[{bmw, 0.99}, {vw, 0.98}, {chevy, 0.97}, {buick, 0.96}] |
|14-2023 |[{chevy, 0.98}, {bmw, 0.98}, {bmw, 0.978}, {bmw, 0.976}, {vw, 0.975}, {bmw, 0.975}, {bmw, 0.97}, {buick, 0.967}, {vw, 0.964}, {vw, 0.96}, {nissan, 0.96}, {chevy, 0.952}, {nissan, 0.95}, {nissan, 0.95}, {lexus, 0.95}, {lexus, 0.94}, {lexus, 0.94}, {nissan, 0.935}, {buick, 0.93}, {chevy, 0.928}]|
|15-2023 |[{chevy, 0.992}, {bmw, 0.987}, {nissan, 0.982}, {bmw, 0.982}, {buick, 0.978}, {lexus, 0.976}, {bmw, 0.975}, {bmw, 0.97}, {chevy, 0.967}, {vw, 0.964}, {lexus, 0.961}, {nissan, 0.96}, {vw, 0.952}, {nissan, 0.952}, {vw, 0.952}, {lexus, 0.943}] |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Define your UDF
def mean(list):
mydict={}
for i in list:
if i.Brand in mydict: mydict[i.Brand].append(i.Weight)
else: mydict[i.Brand] = [i.Weight]
for k, v in mydict.items(): mydict[k] = sum(v) / len(v)
return mydict
mean_udf = udf(lambda z: mean(z), MapType(StringType(), FloatType()))
Apply the UDF:
df.withColumn("mean_value", mean_udf(df.sorted_zipped)).drop(df.sorted_zipped).show(10, False)
+----------+--------------------------------------------------------------------------------------------------------------------+
|WeekOfYear|mean_value |
+----------+--------------------------------------------------------------------------------------------------------------------+
|13-2023 |{chevy -> 0.97, vw -> 0.98, buick -> 0.96, bmw -> 0.99} |
|14-2023 |{chevy -> 0.9533333, vw -> 0.9663333, buick -> 0.94850004, nissan -> 0.94875, lexus -> 0.9433333, bmw -> 0.97580004}|
|15-2023 |{chevy -> 0.9795, vw -> 0.95600003, buick -> 0.978, nissan -> 0.96466666, lexus -> 0.96000004, bmw -> 0.9785} |
+----------+--------------------------------------------------------------------------------------------------------------------+
Upvotes: 1