Reputation: 31
I am trying to build a recommendation system by using Spark ML ALS where data are as follows
"User-ID";"ISBN "; "Book-Rating"
276725;034545104;0
276726;0155061224;5
276727;0446520802;0
276729;052165615;3
276729;0521795028;6
I am using Spark 2.1.0 and mongoldb to load data. Here is my piece of code that defines the dataframe and his schema after casting.
/*
* Chargement de données de rating
*/
val dfrating = spark.loadFromMongoDB(readConfig)
val bookRatings = dfrating.selectExpr("cast(User_ID as Long) User_ID " ,"cast(ISBN as Long) ISBN ", "Book_Rating")
bookRatings.printSchema()
val als = new ALS().setMaxIter(10).setRegParam(0.01).setUserCol("User_ID").setItemCol("ISBN").setRatingCol("Book_Rating")
val model = als.fit(training)
After compiling, I have got
root
|-- User_ID: long (nullable = true)
|-- ISBN: long (nullable = true)
|-- Book_Rating: integer (nullable = true)
+-------+----------+-----------+
|User_ID| ISBN|Book_Rating|
+-------+----------+-----------+
| 215| 61030147| 6|
| 5750|1853260045| 0|
| 11676| 743244249| 0|
| 11676|1551665700| 0|
Caused by: java.lang.IllegalArgumentException: **ALS **only supports values in Integer range for column**s User_ID and ISBN. ****Value** 8.477024456E9 **was out of Integer range.******
at org.apache.spark.ml.recommendation.ALSModelParams$$anonfun$1.apply$mcID$sp(ALS.scala:87)
Is there any other solution to get things running? I have got these suggestions (How to use mllib.recommendation if the user ids are string instead of contiguous integers? How to use long user ID in PySpark ALS and also Non-integer ids in Spark MLlib ALS) for the same problem, but I don't know how to begin.
Here is what I do.
val isbn_als = new StringIndexer()
.setHandleInvalid("skip")
.setInputCol("ISBN")
.setOutputCol("ISBN_als")
.fit(uRatings)
val isbn_als_reverse = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
val als = new ALS().setMaxIter(10).setRegParam(0.01).setUserCol("User_ID").setItemCol("ISBN_als").setRatingCol("Book_Rating")
/*
* On définit l'ordre des opérations à effectuer
*/
println("On passe au Pipeline")
val alsPipeline = new Pipeline().setStages(Array(isbn_als, als, isbn_als_reverse))
/*
* On construit le modèle de recommandation à partir des données de Training
*/
println("On passe à la construction du modèle")
val alsModel = alsPipeline.fit(training)
/*
* On exécute le modèle sur les données de Test, puis on affiche un échantillon de prédictions
*/
println("On exécute le modèle sur les données de Test")
val alsPredictions = alsModel.transform(test).na.drop()
println("Affichage des prédictions")
alsPredictions.select($"User_ID",$"ISBN", $"Book_Rating", $"prediction").show(20)
But I have got this exception when I use IndexToString()
on the pipeline.
On passe au Pipeline
On passe à la construction du modèle
On exécute le modèle sur les données de Test
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.ml.attribute.UnresolvedAttribute$ cannot be cast to org.apache.spark.ml.*attribute.NominalAttribute*
at org.apache.spark.ml.feature.IndexToString.transform(StringIndexer.scala:313)
at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
When I do not use IndexToString()
, I have got a negative prediction.
+-------+---------+-----------+-------------+
|User_ID| ISBN|Book_Rating| prediction|
+-------+---------+-----------+-------------+
| 140340|786881852| 10| 6.9798374|
| 127327|786881852| 0|-1.2718141E-4|
| 103336|786881852| 0| 1.2374072|
| 138578|786881852| 9| 8.200257|
| 172742|786881852| 0| -1.3278971|
| 31909|786881852| 6| 5.997123|
| 69554|786881852| 5| 2.819587|
| 173650|786881852| 0| 0.42850634|
I suppose the negative prediction is due to IndexToString()
that is not used. If so, how to use IndexToString()
on the pipeline?
Upvotes: 2
Views: 2472
Reputation: 9328
The exception you get is emitted by the IndexToString part, which is misconfigured. You make it decode the prediction back to a String, but the prediction is not a product (ISBN), it is a rating : ALS predicts ratings, not products.
Which in turns means you do not need an inverser.
See the following, working sample :
scala> import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.Pipeline
scala> import org.apache.spark.ml.recommendation._
import org.apache.spark.ml.recommendation._
scala> import org.apache.spark.ml.feature._
import org.apache.spark.ml.feature._
// This is just a helper
scala> case class Rating(user: Long, isbn: String, rating: Double)
defined class Rating
// Let's create 2 books, 3 users, 3 ratings to train the model
scala> val rawRatings = Seq(Rating(1, "1234567890123", 1), Rating(2, "12345678901234", 2), Rating(3, "12345678901234", 3))
rawRatings: Seq[Rating] = List(Rating(1,1234567890123,1.0), Rating(2,12345678901234,2.0), Rating(3,12345678901234,3.0))
scala> val ratings = spark.createDataFrame(rawRatings)
scala> val isbn_als = new StringIndexer().setInputCol("isbn").setOutputCol("isbnIDX")
isbn_als: org.apache.spark.ml.feature.StringIndexer = strIdx_53d752f20587
scala> val als = new ALS().setUserCol("user").setItemCol("isbnIDX").setRatingCol("rating")
als: org.apache.spark.ml.recommendation.ALS = als_41eff9ae835d
scala> val stages = Array(isbn_als, als)
stages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}] = Array(strIdx_53d752f20587, als_41eff9ae835d, idxToStr_9b2ca994074f)
// Do the actual training
scala> val pipeline = new Pipeline().setStages(stages)
pipeline: org.apache.spark.ml.Pipeline = pipeline_5f05891139b6
scala> val pipeModel = pipeline.fit(ratings)
pipeModel: org.apache.spark.ml.PipelineModel = pipeline_5f05891139b6
// And make predictions for any user/book combination
scala> case class UserBook(user: Long, isbn: String)
defined class UserBook
scala> val testSet = Seq(UserBook(1, "12345678901234"))
testSet: Seq[UserBook] = List(UserBook(1,12345678901234))
scala> val testDF = spark.createDataFrame(testSet)
testDF: org.apache.spark.sql.DataFrame = [user: bigint, isbn: string]
scala> pipeModel.transform(testDF).show
+----+--------------+-------+----------+
|user| isbn|isbnIDX|prediction|
+----+--------------+-------+----------+
| 1|12345678901234| 0.0| 0.7389956|
+----+--------------+-------+----------+
Here, "prediction" is rating the prediction for book ISBN 12345678901234 for the user 1. The isbnIDX is used for computational purposes only and need not be reversed, because we already have the isbn in the dataframe.
Upvotes: 4