Sayantan Ghosh
Sayantan Ghosh

Reputation: 1056

Using min/max operations in groupByKey on a spark dataset

I am trying to achieve min and max inside agg of a groupByKey operation. The code looks like below:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.TypedColumn
import org.apache.spark.sql.expressions.scalalang.typed.{
  count => typedCount, 
  sum => typedSum }

inputFlowRecords.groupByKey(inputFlowRecrd => inputFlowRecrd.FlowInformation)
  .agg(typedSum[InputFlowRecordV1](_.FlowStatistics.minFlowTime).name("minFlowTime"),
      typedSum[InputFlowRecordV1](_.FlowStatistics.maxFlowTime).name("maxFlowTime"),
      typedSum[InputFlowRecordV1](_.FlowStatistics.flowStartedCount).name("flowStartedCount"),
      typedSum[InputFlowRecordV1](_.FlowStatistics.flowEndedCount).name("flowEndedCount"),
      typedSum[InputFlowRecordV1](_.FlowStatistics.packetsCountFromSource).name("packetsCountFromSource"),
      typedSum[InputFlowRecordV1](_.FlowStatistics.bytesCountFromSource).name("bytesCountFromSource"),
      typedSum[InputFlowRecordV1](_.FlowStatistics.packetsCountFromDestination).name("packetsCountFromDestination"),
      typedSum[InputFlowRecordV1](_.FlowStatistics.bytesCountFromDestination).name("bytesCountFromDestination"))

I am facing 2 problems here:

  1. Instead of sum I want to take min/max on few columns. When I try to use org.apache.spark.sql.functions.min/max operations, the error says TypedColumns should be used. How can this be solved?
  2. The agg function lets us specify only 4 columns max. inside it while I have 8 columns to aggregate. How can this be achieved?

Upvotes: 2

Views: 611

Answers (1)

Kombajn zbożowy
Kombajn zbożowy

Reputation: 10693

Unfortunately it seems that:

In your case a reasonable thing to do might be to define your own specialized aggregator that would aggregate InputFlowStatistics objects, so you only have single argument to agg.

Typed aggregators are defined here: typedaggregators.scala and Spark documentation provides some information on creating custom ones (->link).

Upvotes: 2

Related Questions