Reputation: 439
I have a dataset with 10 field and 5000 rows. I want to complete this dataset with some statistical methods in Spark with Scala. I filled the empty cells in a field with the mean value of that field, if it consists of continuous values and I put most frequent value in the field, if it consists of discrete values. Here is my code:
for(col <- cols){
val datacount = table.select(col).rdd.map(r => r(0)).filter(_ == null).count()
if(datacount > 0)
{
if (continuous_lst contains col) // put mean of data to null values
{
var avg = table.select(mean(col)).first()(0).asInstanceOf[Double]
df = df.na.fill(avg, Seq(col))
}
else if(discrete_lst contains col) // put most frequent categorical value to null values
{
val group_df = df.groupBy(col).count()
val sorted = group_df.orderBy(desc("count")).take(1)
val most_frequent = sorted.map(t => t(0))
val most_frequent_ = most_frequent(0).toString.toDouble.toInt
val type__ = ctype.filter(t => t._1 == col)
val type_ = type__.map(t => t._2)
df = df.na.fill(most_frequent_, Seq(col))
}
}
}
The problem is that this code works very slowly with this data. I use spark-submit
with executor memory 8G
parameter. And I use repartition(4) parameter before sending the data to this function.
I should work bigger sized datasets. So how can I speed up this code?
Thanks for your help.
Upvotes: 1
Views: 191
Reputation: 17872
Here is a suggestion:
import org.apache.spark.sql.funcitons._
def most_frequent(df: DataFrame, col: Column) = {
df.select(col).map { case Row(colVal) => (colVal, 1) }
.reduceByKey(_ + _)
.reduce({case ((val1, cnt1), (val2, cnt2)) => if (cnt1 > cnt2) (val1, cnt1) else (val2, cnt2)})._1
}
val new_continuous_cols = continuous_lst.map {
col => coalesce(col, mean(col)).as(col.toString)
}.toArray
val new_discrete_cols = discrete_lst.map {
col => coalesce(col, lit(most_frequent(table, col)).as(col.toString))
}.toArray
val all_new_cols = new_continuous_cols ++ new_discrete_cols
val newDF = table.select(all_new_cols: _*)
Considerations:
continuous_lst
and discrete_lst
are lists of Column
. If they are lists of String
the idea is the same, but some adjustments are necessary;map
and reduce
to calculate the most frequent value of a column. That can be better than grouping by and aggregating in some cases. (Maybe there is room for improvement here, by calculating the most frequent values for all discrete columns at once?);coalesce
to replace null values, instead of fill
. This may result in some improvement as well. (More info about the coalesce
function in the scaladoc API);Upvotes: 2