Merve Bozo
Merve Bozo

Reputation: 439

How can I improve the performance when completing a table with statistical methods in Apache-Spark?

I have a dataset with 10 field and 5000 rows. I want to complete this dataset with some statistical methods in Spark with Scala. I filled the empty cells in a field with the mean value of that field, if it consists of continuous values and I put most frequent value in the field, if it consists of discrete values. Here is my code:

for(col <- cols){

  val datacount = table.select(col).rdd.map(r => r(0)).filter(_ == null).count()      

  if(datacount > 0)
  {      
    if (continuous_lst contains col)               // put mean of data to null values
    {             
      var avg = table.select(mean(col)).first()(0).asInstanceOf[Double]    
      df = df.na.fill(avg, Seq(col))             
    }

    else if(discrete_lst contains col)            // put most frequent categorical value to null values
    {
      val group_df = df.groupBy(col).count()  
      val sorted = group_df.orderBy(desc("count")).take(1)

      val most_frequent = sorted.map(t => t(0))
      val most_frequent_ = most_frequent(0).toString.toDouble.toInt

      val type__ = ctype.filter(t => t._1 == col)
      val type_ = type__.map(t => t._2)

      df = df.na.fill(most_frequent_, Seq(col))  
      }

    }
  }

The problem is that this code works very slowly with this data. I use spark-submit with executor memory 8G parameter. And I use repartition(4) parameter before sending the data to this function.

I should work bigger sized datasets. So how can I speed up this code?

Thanks for your help.

Upvotes: 1

Views: 191

Answers (1)

Daniel de Paula
Daniel de Paula

Reputation: 17872

Here is a suggestion:

import org.apache.spark.sql.funcitons._

def most_frequent(df: DataFrame, col: Column) = {
  df.select(col).map { case Row(colVal) => (colVal, 1)  } 
    .reduceByKey(_ + _)
    .reduce({case ((val1, cnt1), (val2, cnt2)) => if (cnt1 > cnt2) (val1, cnt1) else (val2, cnt2)})._1
}

val new_continuous_cols = continuous_lst.map {
  col => coalesce(col, mean(col)).as(col.toString)
}.toArray

val new_discrete_cols = discrete_lst.map {
  col => coalesce(col, lit(most_frequent(table, col)).as(col.toString))
}.toArray

val all_new_cols = new_continuous_cols ++ new_discrete_cols
val newDF = table.select(all_new_cols: _*)

Considerations:

  • I assumed that continuous_lst and discrete_lstare lists of Column. If they are lists of String the idea is the same, but some adjustments are necessary;
  • Note that I used map and reduce to calculate the most frequent value of a column. That can be better than grouping by and aggregating in some cases. (Maybe there is room for improvement here, by calculating the most frequent values for all discrete columns at once?);
  • Additionally, I used coalesce to replace null values, instead of fill. This may result in some improvement as well. (More info about the coalesce function in the scaladoc API);
  • I cannot test at the moment, so there may be something missing that I didn't see.

Upvotes: 2

Related Questions