Reputation: 79
Does anyone knows possible causes of this error while trying to execute spark mllib ALS using hands on lab provided by Databricks?
14/11/20 23:33:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/11/20 23:33:39 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Got 27980 ratings from 24071 users on 4211 movies.
Training: 27989, validation: 0, test: 0
Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:806)
at MovieLensALS$.computeRmse(MovieLensALS.scala:149)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply$mcVI$sp(MovieLensALS.scala:95)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply(MovieLensALS.scala:93)
at scala.collection.immutable.List.foreach(List.scala:318)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply$mcVD$sp(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply(MovieLensALS.scala:93)
at scala.collection.immutable.List.foreach(List.scala:318)
at MovieLensALS$$anonfun$main$1.apply$mcVI$sp(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1.apply(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1.apply(MovieLensALS.scala:93)
at scala.collection.immutable.List.foreach(List.scala:318)
at MovieLensALS$.main(MovieLensALS.scala:93)
at MovieLensALS.main(MovieLensALS.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
UPDATE: Sure thing! I am using this class. It is available in https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html and https://databricks-training.s3.amazonaws.com/getting-started.html#additional-required-download. Let me know if there is somehting else that could help
import java.io.File
import scala.io.Source
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._`enter code here`
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
object MovieLensALS {
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
if (args.length != 2) {
println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class MovieLensALS " +
"target/scala-*/movielens-als-ssembly-*.jar movieLensHomeDir personalRatingsFile")
sys.exit(1)
}
// set up environment
val conf = new SparkConf()
.setAppName("MovieLensALS")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
// load personal ratings
val myRatings = loadRatings(args(1))
val myRatingsRDD = sc.parallelize(myRatings, 1)
// load ratings and movie titles
val movieLensHomeDir = args(0)
val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line =>
val fields = line.split("::")
// format: (timestamp % 10, Rating(userId, movieId, rating))
(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
}
val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line =>
val fields = line.split("::")
// format: (movieId, movieName)
(fields(0).toInt, fields(1))
}.collect().toMap
val numRatings = ratings.count()
val numUsers = ratings.map(_._2.user).distinct().count()
val numMovies = ratings.map(_._2.product).distinct().count()
println("Got " + numRatings + " ratings from "
+ numUsers + " users on " + numMovies + " movies.")
// split ratings into train (60%), validation (20%), and test (20%) based on the
// last digit of the timestamp, add myRatings to train, and cache them
val numPartitions = 4
val training = ratings.filter(x => x._1 < 6)
.values
.union(myRatingsRDD)
.repartition(numPartitions)
.cache()
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
.values
.repartition(numPartitions)
.cache()
val test = ratings.filter(x => x._1 >= 8).values.cache()
val numTraining = training.count()
val numValidation = validation.count()
val numTest = test.count()
println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
// train models and evaluate them on the validation set
val ranks = List(8, 12)
val lambdas = List(0.1, 10.0)
val numIters = List(10, 20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val model = ALS.train(training, rank, numIter, lambda)
val validationRmse = computeRmse(model, validation, numValidation)
println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "
+ rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
if (validationRmse < bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
}
}
// evaluate the best model on the test set
val testRmse = computeRmse(bestModel.get, test, numTest)
println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
+ ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")
// create a naive baseline and compare it with the best model
val meanRating = training.union(validation).map(_.rating).mean
val baselineRmse =
math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean)
val improvement = (baselineRmse - testRmse) / baselineRmse * 100
println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")
// make personalized recommendations
val myRatedMovieIds = myRatings.map(_.product).toSet
val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
val recommendations = bestModel.get
.predict(candidates.map((0, _)))
.collect()
.sortBy(- _.rating)
.take(50)
var i = 1
println("Movies recommended for you:")
recommendations.foreach { r =>
println("%2d".format(i) + ": " + movies(r.product))
i += 1
}
// clean up
sc.stop()
}
/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
.join(data.map(x => ((x.user, x.product), x.rating)))
.values
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
}
/** Load ratings from file. */
def loadRatings(path: String): Seq[Rating] = {
val lines = Source.fromFile(path).getLines()
val ratings = lines.map { line =>
val fields = line.split("::")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}.filter(_.rating > 0.0)
if (ratings.isEmpty) {
sys.error("No ratings provided.")
} else {
ratings.toSeq
}
}
}
Upvotes: 3
Views: 17920
Reputation: 21
To add on to @asu's answer. You can use .reduceOption
instead of .reduce
to prevent an error from occurring when calling on an empty collection. You would then just have to handle the Option and can throw a better error message if the RDD is not intended to ever be empty.
Upvotes: 0
Reputation: 1647
I came across the exact same exception. In my case, it was a bug in the code which resulted in the actual ratings RDD to be of size zero :) By passing empty ratings RDD to ALS.train I definitely deserved to get UnsupportedOperationException: empty collection
Upvotes: 1
Reputation: 267
I ran into the same problem working with the same example. The problem is that the training data I was using wasn't large enough and didn't have enough repeated values. The ALS model can only predict pairs of user, product Ids that were present in the training data. (It is some what different from other machine learning algorithms in that way), so if each pair in the validation set, contains one ID which wasn't in the training set, the prediction RDD will be null (since it can't predict any of those values) and the reduce transformation in the rmse method will throw this exception. To avoid this you should:
A) not use this algorithm without sufficient training data and
B) check before entering the "finding the best model" loop that your validation set will work on the training this training set.
Lastly, if you are productizing this algorithm, make sure that you don't use the best model returned by this method, because it is likely not to have all your user and product ids. If thats the case, then you are restricting the new users and products that you can predict on. What I would recommend would be to use this logic to discern the correct training parameters and then, using those parameters, train the model on all the data and use that.
Upvotes: 1
Reputation: 569
Probably it's because of some filters you (or computeRmse method) are using, so the reduce method gets called on an empty collection/RDD, so the "empty collection" is thrown. Try to double check the filters or computeRmse() function.
Upvotes: 4