SH Y.
SH Y.

Reputation: 1749

How to Cache an Array of Dataframes/Values in Spark

I am trying to built a large amount of random forest models by group using Spark. My approach is to cache a large input data file, split it into pieces based on the school_id, cache the individual school input file in memory, run a model on each of them, and then extract the label and predictions.

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID).cache)

import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}

def trainModel(df: DataFrame): PipelineModel = {
   val rf  = new RandomForestClassifier()
   //omit some parameters
   val pipeline = new Pipeline().setStages(Array(rf))
   pipeline.fit(df)
}

val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))

val preds = (0 to schools.length -1).map(i => bySchoolArrayModels(i).transform(bySchoolArray(i)).select("prediction", "label")

preds.write.format("com.databricks.spark.csv").
option("header","true").
save("predictions/pred"+schools(i))

The code works fine on a small subset but it takes longer than I expected. It seems to me every time I run an individual model, Spark reads the entire file and it takes forever to complete all the model runs. I was wondering whether I did not cache the files correctly or anything went wrong with the way I code it.

Any suggestions would be useful. Thanks!

Upvotes: 0

Views: 2712

Answers (1)

axlpado - Agile Lab
axlpado - Agile Lab

Reputation: 353

rdd's methods are immutable, so rdd.cache() returns a new rdd. So you need to assign the cachedRdd to an other variable and then re-use that. Otherwise your are not using the cached rdd.

val cachedModelInput = model_input.cache()
val schools = cachedModelInput.select("School_ID").distinct.collect.flatMap(_.toSeq)
....

Upvotes: 3

Related Questions