Reputation: 53876
I use this code to compute the geometric mean of all rows within a dataframe :
from pyspark.sql.functions import rand, randn, sqrt
df = sqlContext.range(0, 10)
df = df.select(rand(seed=10).alias("c1"), randn(seed=27).alias("c2"))
df.show()
newdf = df.withColumn('total', sqrt(sum(df[col] for col in df.columns)))
newdf.show()
This displays :
To compute the geometric mean of the columns instead of rows I think this code should suffice :
newdf = df.withColumn('total', sqrt(sum(df[row] for row in df.rows)))
But this throws error : NameError: global name 'row' is not defined
So appears the api for accessing columns is not same as accessing rows.
Should I format the data to convert rows to columns and then re-use working algorithm : newdf = df.withColumn('total', sqrt(sum(df[col] for col in df.columns)))
or is there a solution that processes the rows and columns as is ?
Upvotes: 1
Views: 2672
Reputation: 21766
I am not sure you definition of geometric mean is correct. According to Wikipedia, the geometric mean is defined as the nth root of the product of n numbers. According to the same page, the geometric mean can also be expressed as the exponential of the arithmetic mean of logarithms. I shall be using this to calculate the geometric mean of each column.
You can calculate the geometric mean, by combining the column data for c1
and c2
into a new column called value
storing the source column name in column
. After the data has been reformatted, the geometric mean is determined by grouping by column
(c1
or c2
) and calculating the exponential of the arithmetic mean of the logarithmic value for each group. In this calculation NaN
values are ignored.
from pyspark.sql import functions as F
df = sqlContext.range(0, 10)
df = df.select(F.rand(seed=10).alias("c1"), F.randn(seed=27).alias("c2"))
df_id = df.withColumn("id", F.monotonically_increasing_id())
kvp = F.explode(F.array([F.struct(F.lit(c).alias("column"), F.col(c).alias("value")) for c in df.columns])).alias("kvp")
df_pivoted = df_id.select(['id'] + [kvp]).select(['id'] + ["kvp.column", "kvp.value"])
df_geometric_mean = df_pivoted.groupBy(['column']).agg(F.exp(F.avg(F.log(df_pivoted.value))))
df_geometric_mean.withColumnRenamed("EXP(avg(LOG(value)))", "geometric_mean").show()
This returns:
+------+-------------------+
|column| geometric_mean|
+------+-------------------+
| c1|0.25618961513533134|
| c2| 0.415119290980354|
+------+-------------------+
These geometrics means, other than their precision, match the geometric mean return by scipy provided NaN values are ignored as well.
from scipy.stats.mstats import gmean
c1=[x['c1'] for x in df.collect() if x['c1']>0]
c2=[x['c2'] for x in df.collect() if x['c2']>0]
print 'c1 : {0}\r\nc2 : {1}'.format(gmean(c1),gmean(c2))
This snippet returns:
| c1|0.256189615135|
| c2|0.41511929098|
Upvotes: 5