Mysterious
Mysterious

Reputation: 881

Weighted median pyspark dataframes

To calculate row wise weighted median I have written this code. Where am I going wrong that the generated values are coming out to be null? col_A are values and col_B are the weights associated with those values.

Codes:

def get_median(values,weights):
    return np.median(np.repeat(values,weights))    # function created to calculate wt. median

wimedian = F.udf(get_median,DoubleType())    # registering as udf here

myview = df.groupBy('category').agg(
    F.collect_list(F.col('col_A')),
    F.collect_list(F.col('col_B'))
).withColumn('Weighted_median',wimedian(F.col('col_A'),F.col('col_B')))

myview.show(3)

Output table:

+-----------+--------+-------+---------------+
|category   |col_A   |col_B  |Weighted_median|
+-----------+--------+-------+---------------+
|001        |[69]    |[8]    |null           |
|002        |[69]    |[14]   |null           |
|003        |[28, 21]|[3, 1] |null           |
+-----------+--------+-------+---------------+

FYI, Correct output for row #3 in this table should be median of [28,28,28,21] = 28. That's why np.median and np.repeat are there for.

Upvotes: 0

Views: 499

Answers (1)

Kapil
Kapil

Reputation: 166

Problem might seems to be return type as dataframe does not understand numpy type and also column references in withColumn statement are incorrect

I convert the type to float and it is running now

def get_median(values,weights):
    return float(np.median(np.repeat(values,weights)))

wimedian = F.udf(get_median,DoubleType())
df = sc.parallelize([["001",69,8],["002",69,14],["003",28,3],["003",21,1]]).toDF(["category","col_A","col_B"])

myview = df.groupBy('category').agg(
    F.collect_list(F.col('col_A')),
    F.collect_list(F.col('col_B'))).withColumn('Weighted_median',wimedian(F.col("collect_list(col_A)"),F.col("collect_list(col_B)"))).show()

Upvotes: 2

Related Questions