Reputation: 1749
I am trying to built a large amount of random forest models by group using Spark. My approach is to cache a large input data file, split it into pieces based on the school_id, cache the individual school input file in memory, run a model on each of them, and then extract the label and predictions.
model_input.cache()
val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID).cache)
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
def trainModel(df: DataFrame): PipelineModel = {
val rf = new RandomForestClassifier()
//omit some parameters
val pipeline = new Pipeline().setStages(Array(rf))
pipeline.fit(df)
}
val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))
val preds = (0 to schools.length -1).map(i => bySchoolArrayModels(i).transform(bySchoolArray(i)).select("prediction", "label")
preds.write.format("com.databricks.spark.csv").
option("header","true").
save("predictions/pred"+schools(i))
The code works fine on a small subset but it takes longer than I expected. It seems to me every time I run an individual model, Spark reads the entire file and it takes forever to complete all the model runs. I was wondering whether I did not cache the files correctly or anything went wrong with the way I code it.
Any suggestions would be useful. Thanks!
Upvotes: 0
Views: 2712
Reputation: 353
rdd's methods are immutable, so rdd.cache() returns a new rdd. So you need to assign the cachedRdd to an other variable and then re-use that. Otherwise your are not using the cached rdd.
val cachedModelInput = model_input.cache()
val schools = cachedModelInput.select("School_ID").distinct.collect.flatMap(_.toSeq)
....
Upvotes: 3