Chn
Chn

Reputation: 369

Spark dataframe aggregate on multiple columns

Actually I am working on pyspark code. My dataframe is

+-------+--------+--------+--------+--------+
|element|collect1|collect2|collect3|collect4|
+-------+--------+--------+--------+--------+
|A1     |   1.02 |    2.6 |   5.21 |    3.6 |
|A2     |   1.61 |   2.42 |   4.88 |   6.08 |
|B1     |   1.66 |   2.01 |    5.0 |    4.3 |
|C2     |   2.01 |   1.85 |   3.42 |   4.44 |
+-------+--------+--------+--------+--------+

I need to find the mean and stddev for each element by aggregating all the collectX columns. The final result should be as below.

+-------+--------+--------+
|element|mean    |stddev  |
+-------+--------+--------+
|A1     |   3.11 |   1.76 |
|A2     |   3.75 |   2.09 |
|B1     |   3.24 |   1.66 |
|C2     |   2.93 |   1.23 |
+-------+--------+--------+

The below code breakdown all the mean at individual columns df.groupBy("element").mean().show(). Instead of doing for each column, is it possible to rollup for all the columns?

+-------+-------------+-------------+-------------+-------------+
|element|avg(collect1)|avg(collect2)|avg(collect3)|avg(collect4)|
+-------+-------------+-------------+-------------+-------------+
|A1     |   1.02      |   2.6       |   5.21      |    3.6      |
|A2     |   1.61      |   2.42      |   4.88      |   6.08      |
|B1     |   1.66      |   2.01      |    5.0      |    4.3      |
|C2     |   2.01      |   1.85      |   3.42      |   4.44      |
+-------+-------------+-------------+-------------+-------------+

I tried to make use of the describe function as it has the complete aggregation functions but still shown as individual column df.groupBy("element").mean().describe().show()

thanks

Upvotes: 1

Views: 4513

Answers (2)

the.malkolm
the.malkolm

Reputation: 2421

Spark allows you to gather all sort of stats per column. You are trying to calculate stats per row. In this case you can hack something with udf. Here is an example :D

$ pyspark
>>> from pyspark.sql.types import DoubleType
>>> from pyspark.sql.functions import array, udf
>>>
>>> mean = udf(lambda v: sum(v) / len(v), DoubleType())
>>> df = sc.parallelize([['A1', 1.02, 2.6, 5.21, 3.6], ['A2', 1.61, 2.42, 4.88, 6.08]]).toDF(['element', 'collect1', 'collect2', 'collect3', 'collect4'])
>>> df.show()
+-------+--------+--------+--------+--------+
|element|collect1|collect2|collect3|collect4|
+-------+--------+--------+--------+--------+
|     A1|    1.02|     2.6|    5.21|     3.6|
|     A2|    1.61|    2.42|    4.88|    6.08|
+-------+--------+--------+--------+--------+
>>> df.select('element', mean(array(df.columns[1:])).alias('mean')).show()
+-------+------+
|element|  mean|
+-------+------+
|     A1|3.1075|
|     A2|3.7475|
+-------+------+

Upvotes: 1

David Griffin
David Griffin

Reputation: 13927

Did you try just adding the columns together and possibly dividing by 4?

SELECT avg((collect1 + collect2 + collect3 + collect4) / 4),
  stddev((collect1 + collect2 + collect3 + collect4) / 4)

That's not going to do exactly what you want but get the idea.

Not sure your language, but you can always build the query on the fly if you aren't happy with hardcoded:

val collectColumns = df.columns.filter(_.startsWith("collect"))
val stmnt = "SELECT avg((" + collectColumns.mkString(" + ") + ") / " + collectColumns.length + "))"

You get the idea.

Upvotes: 0

Related Questions