Richa Banker
Richa Banker

Reputation: 503

Multiple Aggregate operations on the same column of a spark dataframe

I have three Arrays of string type containing following information:

I am trying to use spark data frames to achieve this. Spark data frames provide an agg() where you can pass a Map [String,String] (of column name and respective aggregate operation ) as input, however I want to perform different aggregation operations on the same column of the data. Any suggestions on how to achieve this?

Upvotes: 43

Views: 49934

Answers (6)

aishik roy chaudhury
aishik roy chaudhury

Reputation: 319

Do something like

from pyspark.sql import functions as F

df.groupBy('groupByColName') \
  .agg(F.sum('col1').alias('col1_sum'),
       F.max('col2').alias('col2_max'),
       F.avg('col2').alias('col2_avg')) \
  .show()

Upvotes: 8

Ashutosh Kushwaha
Ashutosh Kushwaha

Reputation: 11

for example if you want to count percentage of zeroes in each column in pyspark dataframe for which we can use expression to be executed on each column of the dataframe

from pyspark.sql.functions import count,col

    def count_zero_percentage(c):
        
        pred = col(c)==0
        return sum(pred.cast("integer")).alias(c)
    
    df.agg(*[count_zero_percentage(c)/count('*').alias(c) for c in df.columns]).show()

Upvotes: 0

Here is another straight forward way to apply different aggregate functions on the same column while using Scala (this has been tested in Azure Databricks).

val groupByColName = "Store"
val colName = "Weekly_Sales"

df.groupBy(groupByColName)
  .agg(min(colName),
       max(colName),
       round(avg(colName), 2))
  .show()

Upvotes: 1

user2458922
user2458922

Reputation: 1721

case class soExample(firstName: String, lastName: String, Amount: Int)
val df =  Seq(soExample("me", "zack", 100)).toDF

import org.apache.spark.sql.functions._

val groupped = df.groupBy("firstName", "lastName").agg(
     sum("Amount"),
     mean("Amount"), 
     stddev("Amount"),
     count(lit(1)).alias("numOfRecords")
   ).toDF()

display(groupped)

// Courtesy Zach ..

Zach simplified answer for a post Marked Duplicate Spark Scala Data Frame to have multiple aggregation of single Group By

Upvotes: -2

zero323
zero323

Reputation: 330063

Scala:

You can for example map over a list of functions with a defined mapping from name to function:

import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
  "min" -> min, "max" -> max, "mean" -> avg)

val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// |  k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// |  1|   3.0|   3.0|   3.0|
// |  2|  -5.0|  -5.0|  -5.0|
// +---+------+------+------+

or

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show

Unfortunately parser which is used internally SQLContext is not exposed publicly but you can always try to build plain SQL queries:

df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
  f => s"$f($c) AS ${c}_${f}")
).mkString(",")

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")

Python:

from pyspark.sql.functions import mean, sum, max, col

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)

See also:

Upvotes: 72

Rick
Rick

Reputation: 2110

For those that wonder, how @zero323 answer can be written without a list comprehension in python:

from pyspark.sql.functions import min, max, col
# init your spark dataframe

expr = [min(col("valueName")),max(col("valueName"))]
df.groupBy("keyName").agg(*expr)

Upvotes: 9

Related Questions