Reputation: 503
I have three Arrays of string type containing following information:
I am trying to use spark data frames to achieve this. Spark data frames provide an agg() where you can pass a Map [String,String] (of column name and respective aggregate operation ) as input, however I want to perform different aggregation operations on the same column of the data. Any suggestions on how to achieve this?
Upvotes: 43
Views: 49934
Reputation: 319
Do something like
from pyspark.sql import functions as F
df.groupBy('groupByColName') \
.agg(F.sum('col1').alias('col1_sum'),
F.max('col2').alias('col2_max'),
F.avg('col2').alias('col2_avg')) \
.show()
Upvotes: 8
Reputation: 11
for example if you want to count percentage of zeroes in each column in pyspark dataframe for which we can use expression to be executed on each column of the dataframe
from pyspark.sql.functions import count,col
def count_zero_percentage(c):
pred = col(c)==0
return sum(pred.cast("integer")).alias(c)
df.agg(*[count_zero_percentage(c)/count('*').alias(c) for c in df.columns]).show()
Upvotes: 0
Reputation: 771
Here is another straight forward way to apply different aggregate functions on the same column while using Scala (this has been tested in Azure Databricks).
val groupByColName = "Store"
val colName = "Weekly_Sales"
df.groupBy(groupByColName)
.agg(min(colName),
max(colName),
round(avg(colName), 2))
.show()
Upvotes: 1
Reputation: 1721
case class soExample(firstName: String, lastName: String, Amount: Int)
val df = Seq(soExample("me", "zack", 100)).toDF
import org.apache.spark.sql.functions._
val groupped = df.groupBy("firstName", "lastName").agg(
sum("Amount"),
mean("Amount"),
stddev("Amount"),
count(lit(1)).alias("numOfRecords")
).toDF()
display(groupped)
// Courtesy Zach ..
Zach simplified answer for a post Marked Duplicate Spark Scala Data Frame to have multiple aggregation of single Group By
Upvotes: -2
Reputation: 330063
Scala:
You can for example map over a list of functions with a defined mapping
from name to function:
import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column
val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
"min" -> min, "max" -> max, "mean" -> avg)
val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))
df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// | k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// | 1| 3.0| 3.0| 3.0|
// | 2| -5.0| -5.0| -5.0|
// +---+------+------+------+
or
df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show
Unfortunately parser which is used internally SQLContext
is not exposed publicly but you can always try to build plain SQL queries:
df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
f => s"$f($c) AS ${c}_${f}")
).mkString(",")
sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")
Python:
from pyspark.sql.functions import mean, sum, max, col
df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"]
funs = [mean, sum, max]
exprs = [f(col(c)) for f in funs for c in aggregate]
# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)
See also:
Upvotes: 72
Reputation: 2110
For those that wonder, how @zero323 answer can be written without a list comprehension in python:
from pyspark.sql.functions import min, max, col
# init your spark dataframe
expr = [min(col("valueName")),max(col("valueName"))]
df.groupBy("keyName").agg(*expr)
Upvotes: 9