Markus
Markus

Reputation: 3782

None values when calculating the mean and standard deviation

I am calculating the mean and standard deviation of the nested data products in PySpark DataFrame.

+----------+--------------------------------+
|product_PK|                        products|
+----------+--------------------------------+
|      686 |          [[686,520.70],[645,2]]|
|      685 |[[685,45.556],[678,23],[655,21]]|
|      693 |                              []|

The problem is that I get None as the values of the mean and standard deviation. Most probably it happens because the code does not consider []. The empty values should be substituted by 0. Also, IntegerType should be probably a float value.

How can I get the correct result instead of None?

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as mean_, stddev as stddev_

df = sqlCtx.createDataFrame(
    [(686, [[686,520.70], [645,2]]), (685, [[685,45.556], [678,23],[655,21]]), (693, [])],
    ["product_PK", "products"]
)

get_score = udf(lambda x: x[1], IntegerType())

df_stats = df.withColumn('exploded', explode(col('products')))\
    .withColumn('score', get_score(col('exploded')))\
    .select(
        mean_(col('score')).alias('mean'),
        stddev_(col('score')).alias('std')
    )\
    .collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print([mean, std])

Upvotes: 0

Views: 2925

Answers (1)

philantrovert
philantrovert

Reputation: 10082

First, you don't need a UDF to fetch an item from the array

Second, just use na.fill to fill NULL values with a number (zero in your case)

df.withColumn("exploded" , explode(col("products") ) )
  .withColumn("score", col("exploded").getItem(1) )
  .na.fill(0)
  .select( 
           mean_(col("score") ).alias("mean") , 
           stddev_(col("score") ).alias("stddev") 
          )
  .show()

+----+------------------+
|mean|            stddev|
+----+------------------+
| 9.2|11.734564329364767|
+----+------------------+

To get the values separately in a variable :

row = df.withColumn("exploded" , explode(col("products") ) )
        .withColumn("score", col("exploded").getItem(1) )
        .na.fill(0)
        .select( 
           mean_(col("score") ).alias("mean") , 
           stddev_(col("score") ).alias("stddev") 
         )
        .first()

mean = row.mean
stddev = row.stddev

Upvotes: 2

Related Questions