BumbleBee
BumbleBee

Reputation: 13

Recursively calculate columns and add to Spark Dataframe in Scala

I am new to Scala and Apache Spark. I am trying to calculate mean and standard deviation for a few columns in a Spark dataframe and append the result to the source dataframe. I am trying to do this recursively. Following is my function.

def get_meanstd_data(mergedDF: DataFrame, grpByList: Seq[String]): DataFrame = {

val normFactors = Iterator("factor_1", "factor_2", "factor_3", "factor_4")

def meanStdCalc(df: DataFrame, column: String): DataFrame = {
  val meanDF = df.select("column_1", column).groupBy(grpByList.head, grpByList.tail: _*).
    agg(mean(column).as("mean_" + column))
  val stdDF = df.select("column_1", column).groupBy(grpByList.head, grpByList.tail: _*).
    agg(stddev_pop(column).as("stddev_" + column))
  val finalDF = meanDF.join(stdDF, usingColumns = grpByList, joinType = "left")
  finalDF
}

def recursorFunc(df: DataFrame): DataFrame = {
  @tailrec
  def recursorHelper(acc: DataFrame): DataFrame = {
    if (!normFactors.hasNext) acc
    else recursorHelper(meanStdCalc(acc, normFactors.next()))
  }
  recursorHelper(df)
}
val finalDF = recursorFunc(mergedDF)
finalDF

}

But when I call the function, the resulting dataframe only contains mean and standard deviation of "factor_4". How do I get a dataframe with the mean and standard deviation of all factors appended to the original dataframe?

Any help is much appreciated.

Upvotes: 1

Views: 606

Answers (1)

Javier Montón
Javier Montón

Reputation: 5736

Probably you don't need to use a custom recursive method and you could use fold. Something like creating normFactors as List and using foldLeft:

val normFactors = Iterator("factor_1", "factor_2", "factor_3", "factor_4")

normFactors.foldLeft(mergedDF)((df, column) => meanStdCalc(df, column))

foldLeft allows you to use the DataFrame as an accumulator

Upvotes: 0

Related Questions