Reputation: 3782
I am calculating the mean and standard deviation of the nested data products
in PySpark DataFrame.
+----------+--------------------------------+
|product_PK| products|
+----------+--------------------------------+
| 686 | [[686,520.70],[645,2]]|
| 685 |[[685,45.556],[678,23],[655,21]]|
| 693 | []|
The problem is that I get None as the values of the mean and standard deviation. Most probably it happens because the code does not consider []
. The empty values should be substituted by 0.
Also, IntegerType should be probably a float value.
How can I get the correct result instead of None?
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as mean_, stddev as stddev_
df = sqlCtx.createDataFrame(
[(686, [[686,520.70], [645,2]]), (685, [[685,45.556], [678,23],[655,21]]), (693, [])],
["product_PK", "products"]
)
get_score = udf(lambda x: x[1], IntegerType())
df_stats = df.withColumn('exploded', explode(col('products')))\
.withColumn('score', get_score(col('exploded')))\
.select(
mean_(col('score')).alias('mean'),
stddev_(col('score')).alias('std')
)\
.collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
print([mean, std])
Upvotes: 0
Views: 2925
Reputation: 10082
First, you don't need a UDF to fetch an item from the array
Second, just use na.fill
to fill NULL values with a number (zero in your case)
df.withColumn("exploded" , explode(col("products") ) )
.withColumn("score", col("exploded").getItem(1) )
.na.fill(0)
.select(
mean_(col("score") ).alias("mean") ,
stddev_(col("score") ).alias("stddev")
)
.show()
+----+------------------+
|mean| stddev|
+----+------------------+
| 9.2|11.734564329364767|
+----+------------------+
To get the values separately in a variable :
row = df.withColumn("exploded" , explode(col("products") ) )
.withColumn("score", col("exploded").getItem(1) )
.na.fill(0)
.select(
mean_(col("score") ).alias("mean") ,
stddev_(col("score") ).alias("stddev")
)
.first()
mean = row.mean
stddev = row.stddev
Upvotes: 2