Reputation: 1354
I have list of aggregation function, alias and other as JSON configuration, like
{
"aggregation": [{
"alias_column_name1": {
"sum": "<columnName1>"
}
}, {
"alias_column_name2": {
"sum": "<columnName1>"
}
}]
}
Currently I have executing same by following code:
val col1:Column = sum(<dataframeName>(<columnName1>)).alias(<alias_column_name1>)
val col2:Column = sum(<dataframeName>(<columnName2>)).alias(<alias_column_name2>)
dataframe.groupby(..).agg(col1, col2)
But I have many aggregation configuration and I want to pass List of such in aggregation method, like
val colList = List[Column](col1, col2)
dataframe.groupby(..).agg(colList)
How can I achieve same? Thanks
Versions:
Scala : 2.11
Spark : 2.2.2
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.2.2"
Upvotes: 0
Views: 2255
Reputation: 4631
Separate list of columns and functions
Let's say you have a list of functions:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
val funs: Seq[Column => Column] = Seq(sum _, min _, max _)
and a list of columns
val cols: Seq[Column] = Seq($"y", $"z")
and a dataset
val df = Seq((1, 2, 3), (1, 4, 5) ).toDF("x", "y", "z")
you can combine both
val exprs = for { c <- cols; f <- funs} yield f(c)
and then
df.groupBy($"x").agg(exprs.head, exprs.tail: _*)
The same thing could be done in PySpark:
from pyspark.sql import functions as F
funs = [F.sum, F.min, F.max]
cols = ["y", "z"]
df = spark.createDataFrame([(1, 2, 3), (1, 4, 5)], ("x", "y", "z"))
df.groupBy("x").agg(*[f(c) for c in cols for f in funs])
Predefined list of operations for each column
If you want start with predefined set of aliases, columns and functions, as the one shown in your question, it might be easier to just restructure it to
trait AggregationOp {
def expr: Column
}
case class FuncAggregationOp(c: Column, func: Column => Column, alias: String
) extends AggregationOp {
def expr = func(c).alias(alias)
}
val ops: Seq[AggregationOp] = Seq(
FuncAggregationOp($"y", sum _, "alias_column_name1"),
FuncAggregationOp($"z", sum _, "alias_column_name2")
)
val exprs = ops.map(_.expr)
df.groupBy($"x").agg(exprs.head, exprs.tail: _*)
You can easily adjust this to handle other cases:
case class StringAggregationOp(c: String, func: String, alias: String
) extends AggregationOp {
def expr = org.apache.spark.sql.functions.expr(s"${func}(`${c}`)").alias(alias)
}
val ops: Seq[AggregationOp] = Seq(
StringAggregationOp("y", "sum", "alias_column_name1"),
StringAggregationOp("z", "sum", "alias_column_name2")
)
Python equivalent could be something like this:
from collections import namedtuple
from pyspark.sql import functions as F
class AggregationOp(namedtuple("Op", ["c", "func", "alias"])):
def expr(self):
if callable(self.func):
return self.func(self.c).alias(self.alias)
else:
return F.expr("{func}(`{c}`)".format
(func = self.func, c = self.c)).alias(self.alias)
ops = [
AggregationOp("y", "sum", "alias_column_name1"),
AggregationOp("z", "sum", "alias_column_name2")
]
df.groupBy("x").agg(*[op.expr() for op in ops])
Related question:
Upvotes: 3