user811602
user811602

Reputation: 1354

Spark DataFrame: Multiple Aggregation function on Multiple column

I have list of aggregation function, alias and other as JSON configuration, like

{
    "aggregation": [{
        "alias_column_name1": {
            "sum": "<columnName1>"
        }
    }, {
        "alias_column_name2": {
            "sum": "<columnName1>"
        }
    }]
}

Currently I have executing same by following code:

val col1:Column = sum(<dataframeName>(<columnName1>)).alias(<alias_column_name1>)
val col2:Column = sum(<dataframeName>(<columnName2>)).alias(<alias_column_name2>)
dataframe.groupby(..).agg(col1, col2)

But I have many aggregation configuration and I want to pass List of such in aggregation method, like

val colList = List[Column](col1, col2)
dataframe.groupby(..).agg(colList)

How can I achieve same? Thanks

Versions:

Scala : 2.11
Spark : 2.2.2
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.2.2"

Upvotes: 0

Views: 2255

Answers (1)

10465355
10465355

Reputation: 4631

Separate list of columns and functions

Let's say you have a list of functions:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val funs: Seq[Column => Column] = Seq(sum _, min _, max _)

and a list of columns

val cols: Seq[Column] = Seq($"y", $"z")

and a dataset

val df = Seq((1, 2, 3), (1, 4, 5) ).toDF("x", "y", "z")

you can combine both

val exprs = for { c <- cols; f <- funs} yield f(c)

and then

df.groupBy($"x").agg(exprs.head, exprs.tail: _*)

The same thing could be done in PySpark:

from pyspark.sql import functions as F

funs = [F.sum, F.min, F.max]
cols = ["y", "z"]

df = spark.createDataFrame([(1, 2, 3), (1, 4, 5)], ("x", "y", "z"))

df.groupBy("x").agg(*[f(c) for c in cols for f in funs])

Predefined list of operations for each column

If you want start with predefined set of aliases, columns and functions, as the one shown in your question, it might be easier to just restructure it to

trait AggregationOp {
  def expr: Column
}

case class FuncAggregationOp(c: Column, func: Column => Column, alias: String
    ) extends AggregationOp {
  def expr = func(c).alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   FuncAggregationOp($"y", sum _, "alias_column_name1"),
   FuncAggregationOp($"z", sum _, "alias_column_name2")
)
val exprs = ops.map(_.expr)

df.groupBy($"x").agg(exprs.head, exprs.tail: _*)

You can easily adjust this to handle other cases:

case class StringAggregationOp(c: String, func: String, alias: String
    ) extends AggregationOp {
  def expr = org.apache.spark.sql.functions.expr(s"${func}(`${c}`)").alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   StringAggregationOp("y", "sum", "alias_column_name1"),
   StringAggregationOp("z", "sum", "alias_column_name2")
)

Python equivalent could be something like this:

from collections import namedtuple
from pyspark.sql import functions as F

class AggregationOp(namedtuple("Op", ["c", "func", "alias"])):
    def expr(self):
        if callable(self.func):
            return self.func(self.c).alias(self.alias)
        else:
            return F.expr("{func}(`{c}`)".format
                (func = self.func, c = self.c)).alias(self.alias)

ops = [
    AggregationOp("y", "sum", "alias_column_name1"),
    AggregationOp("z", "sum", "alias_column_name2")
]

 df.groupBy("x").agg(*[op.expr() for op in ops])

Related question:

Upvotes: 3

Related Questions