Reputation: 1056
I am trying to achieve min
and max
inside agg
of a groupByKey
operation. The code looks like below:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.TypedColumn
import org.apache.spark.sql.expressions.scalalang.typed.{
count => typedCount,
sum => typedSum }
inputFlowRecords.groupByKey(inputFlowRecrd => inputFlowRecrd.FlowInformation)
.agg(typedSum[InputFlowRecordV1](_.FlowStatistics.minFlowTime).name("minFlowTime"),
typedSum[InputFlowRecordV1](_.FlowStatistics.maxFlowTime).name("maxFlowTime"),
typedSum[InputFlowRecordV1](_.FlowStatistics.flowStartedCount).name("flowStartedCount"),
typedSum[InputFlowRecordV1](_.FlowStatistics.flowEndedCount).name("flowEndedCount"),
typedSum[InputFlowRecordV1](_.FlowStatistics.packetsCountFromSource).name("packetsCountFromSource"),
typedSum[InputFlowRecordV1](_.FlowStatistics.bytesCountFromSource).name("bytesCountFromSource"),
typedSum[InputFlowRecordV1](_.FlowStatistics.packetsCountFromDestination).name("packetsCountFromDestination"),
typedSum[InputFlowRecordV1](_.FlowStatistics.bytesCountFromDestination).name("bytesCountFromDestination"))
I am facing 2 problems here:
org.apache.spark.sql.functions.min/max
operations, the error says TypedColumns should be used. How can this be solved?agg
function lets us specify only 4 columns max. inside it while I have 8 columns to aggregate. How can this be achieved?Upvotes: 2
Views: 611
Reputation: 10693
Unfortunately it seems that:
In your case a reasonable thing to do might be to define your own specialized aggregator that would aggregate InputFlowStatistics
objects, so you only have single argument to agg
.
Typed aggregators are defined here: typedaggregators.scala and Spark documentation provides some information on creating custom ones (->link).
Upvotes: 2