Reputation: 881
To calculate row wise weighted median I have written this code. Where am I going wrong that the generated values are coming out to be null? col_A are values and col_B are the weights associated with those values.
Codes:
def get_median(values,weights):
return np.median(np.repeat(values,weights)) # function created to calculate wt. median
wimedian = F.udf(get_median,DoubleType()) # registering as udf here
myview = df.groupBy('category').agg(
F.collect_list(F.col('col_A')),
F.collect_list(F.col('col_B'))
).withColumn('Weighted_median',wimedian(F.col('col_A'),F.col('col_B')))
myview.show(3)
Output table:
+-----------+--------+-------+---------------+
|category |col_A |col_B |Weighted_median|
+-----------+--------+-------+---------------+
|001 |[69] |[8] |null |
|002 |[69] |[14] |null |
|003 |[28, 21]|[3, 1] |null |
+-----------+--------+-------+---------------+
FYI, Correct output for row #3 in this table should be median of [28,28,28,21] = 28
.
That's why np.median
and np.repeat
are there for.
Upvotes: 0
Views: 499
Reputation: 166
Problem might seems to be return type as dataframe does not understand numpy type and also column references in withColumn statement are incorrect
I convert the type to float and it is running now
def get_median(values,weights):
return float(np.median(np.repeat(values,weights)))
wimedian = F.udf(get_median,DoubleType())
df = sc.parallelize([["001",69,8],["002",69,14],["003",28,3],["003",21,1]]).toDF(["category","col_A","col_B"])
myview = df.groupBy('category').agg(
F.collect_list(F.col('col_A')),
F.collect_list(F.col('col_B'))).withColumn('Weighted_median',wimedian(F.col("collect_list(col_A)"),F.col("collect_list(col_B)"))).show()
Upvotes: 2