Reputation: 196
I am creating a RandomForestClassifier using Spark 2.0 for solving a multiclass classification problem. I am able to train the model successfully and save the trained model using the model.save() method to an S3 bucket. However while loading this model using the load() I am getting the following error.
`
Exception in thread "main" java.util.NoSuchElementException: Param numTrees does not exist.
at org.apache.spark.ml.param.Params$$anonfun$getParam$2.apply(params.scala:609)
at org.apache.spark.ml.param.Params$$anonfun$getParam$2.apply(params.scala:609)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.ml.param.Params$class.getParam(params.scala:608)
at org.apache.spark.ml.PipelineStage.getParam(Pipeline.scala:42)
at org.apache.spark.ml.util.DefaultParamsReader$$anonfun$getAndSetParams$1.apply(ReadWrite.scala:430)
at org.apache.spark.ml.util.DefaultParamsReader$$anonfun$getAndSetParams$1.apply(ReadWrite.scala:429)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.ml.util.DefaultParamsReader$.getAndSetParams(ReadWrite.scala:429)
at org.apache.spark.ml.classification.RandomForestClassificationModel$RandomForestClassificationModelReader.load(RandomForestClassifier.scala:310)
at org.apache.spark.ml.classification.RandomForestClassificationModel$RandomForestClassificationModelReader.load(RandomForestClassifier.scala:284)
at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstance(ReadWrite.scala:447)
at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:267)
o.a.p.h.InternalParquetRecordReader at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:265)
: at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
block read in memory in 4226 ms. row count = 52598
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
2017-05-04 21:53:08.140 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:265)
at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:341)
at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:335)
--- at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstance(ReadWrite.scala:447)
at org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelReader.load(CrossValidator.scala:269)
at org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelReader.load(CrossValidator.scala:256)
at org.apache.spark.ml.util.MLReadable$class.load(ReadWrite.scala:227)
at org.apache.spark.ml.tuning.CrossValidatorModel$.load(CrossValidator.scala:240)
: at org.apache.spark.ml.tuning.CrossValidatorModel.load(CrossValidator.scala)
`
Below is the code snippet that I use to train and save the model
val assembler = new VectorAssembler();
assembler.setInputCols(inputColumnNames);
assembler.setOutputCol("Inputs_Indexed");
//split 70:30 training and test data
val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))
//train using RandomForest Model
val rf = new RandomForestClassifier()
.setLabelCol("Facing_Indexed")
.setFeaturesCol("Inputs_Indexed")
.setNumTrees(500);
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels);
val stageList = new ArrayList[PipelineStage];
stageList.addAll(categoricalInputModels);
stageList.add(labelIndexer);
stageList.add(assembler);
stageList.add(rf);
stageList.add(labelConverter);
val stages= new Array[PipelineStage](stageList.size);
//convert stages list to array
stageList.toArray(stages);
val pipeline = new Pipeline().setStages(stages)
val paramGrid = new ParamGridBuilder().addGrid(rf.maxDepth, Array(3, 5, 8)).build()
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("Facing_Indexed")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid)
val model = cv.fit(trainingData)
val predictions = model.transform(testData);
predictions.select("predictedLabel", "Facing", "Inputs_Indexed").show(5);
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))
model.save("s3n://xyz_path/au.model")
After the trained model is saved I use the CrossValidatorModel.load("s3n://xyz_path/au.model") to load the model in a separate Java program which throws the above mentioned error. In my S3 bucket I can see the serialized model being saved. I am not sure where it is going wrong. Any help with respect to this error is appreciated.
Upvotes: 0
Views: 1245
Reputation: 196
I figured out what was the problem. The AWS EMR cluster was running Spark 2.1.0 using which I was training and saving my model to the S3 bucket. However in my Java Program I was pointing to 2.0.0 version of Spark MLLib. I found that there was a breaking change related to the "numTrees" Param in RandomForestClassifierModel reported in the 2.0 to 2.1 migration guide here http://spark.apache.org/docs/latest/ml-guide.html#from-20-to-21
So, finally I updated the Spark MLLib maven dependency in my Java Project to point to version 2.1.0.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.0</version>
</dependency>
It then complained about an additional missing class
java.lang.NoClassDefFoundError: org/codehaus/commons/compiler/UncheckedCompileException
It got fixed when I added the commons-compiler dependency
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>2.7.8</version>
</dependency>
And thats how finally my persisted model was successfully loaded !
Upvotes: 1