Reputation: 9
I've been studying Spark for a while but today I got stuck, I'm working in a Recommendation model using Audioscrobbler Dataset.
I have my model based in ALS and the following definition for make the recommendations:
def makeRecommendations(model: ALSModel, userID: Int,howMany: Int): DataFrame = {
val toRecommend = model.itemFactors.select($"id".as("artist")).withColumn("user", lit(userID))
model.transform(toRecommend).
select("artist", "prediction", "user").
orderBy($"prediction".desc).
limit(howMany)
}
It's generating the expected output, but now I would like to create a new list of DataFrames using Predictions DF and User Data DF.
New list of DF consisting of the Predicted value from "Predictions DF" and "Listened" that will be 0 if the user didn't listened the artist or 1 if the user listened, something like this:
I tried the following solution:
val recommendationsSeq = someUsers.map { userID =>
//Gets the artists from user in testData
val artistsOfUser = testData.where($"user".===(userID)).select("artist").rdd.map(r => r(0)).collect.toList
// Recommendations for each user
val recoms = makeRecommendations(model, userID, numRecom)
//Insert a column listened with 1 if the artist in the test set for the user and 0 otherwise
val recomOutput = recoms.withColumn("listened", when($"artist".isin(artistsOfUser: _*), 1.0).otherwise(0.0)).drop("artist")
(recomOutput)
}.toSeq
But its very time consuming when the recommendation has more than 30 users. I believe there's a better way to do it,
Could someone give some idea?
Thanks,
Upvotes: 0
Views: 56
Reputation: 2451
You can try joining dataframes then goupby and count:
scala> val df1 = Seq((1205,0.9873411,1000019)).toDF("artist","prediction","user")
scala> df1.show()
+------+----------+-------+
|artist|prediction| user|
+------+----------+-------+
| 1205| 0.9873411|1000019|
+------+----------+-------+
scala> val df2 = Seq((1000019,1205,40)).toDF("user","artist","playcount")
scala> df2.show()
+-------+------+---------+
| user|artist|playcount|
+-------+------+---------+
|1000019| 1205| 40|
+-------+------+---------+
scala> df1.join(df2,Seq("artist","user")).groupBy('prediction).count().show()
+----------+-----+
|prediction|count|
+----------+-----+
| 0.9873411| 1|
+----------+-----+
Upvotes: 1