Reputation: 21
I am writing the movie recommender codes in Pyspark. The Recommendation output from ALS is an array inside the movie_id column and another array inside the rating column. But when I am trying to explode the columns individually into temporary dataframes and then join them using 'user_id' the 'inner' join is resulting in a cartesian product.
user_recs_one = user_recs.where(user_recs.user_id == 1)
user_recs_one.show(truncate=False)
+-------+-------------------------------------------------------+
|user_id|recommendations |
+-------+-------------------------------------------------------+
|1 |[[1085, 6.1223927], [1203, 6.0752907], [745, 5.954721]]|
+-------+-------------------------------------------------------+
user_recs_one
DataFrame[user_id: int, recommendations: array<struct<movie_id:int,rating:float>>]
user_recs_one = user_recs_one.select("user_id", "recommendations.movie_id", "recommendations.rating")
user_recs_one.show(truncate=False)
+-------+-----------------+--------------------------------+
|user_id|movie_id |rating |
+-------+-----------------+--------------------------------+
|1 |[1085, 1203, 745]|[6.1223927, 6.0752907, 5.954721]|
+-------+-----------------+--------------------------------+
user_recs_one
DataFrame[user_id: int, movie_id: array<int>, rating: array<float>]
x = user_recs_one.select("user_id", F.explode(col("movie_id")).alias("movie_id"))
x.show()
+-------+--------+
|user_id|movie_id|
+-------+--------+
| 1| 1085|
| 1| 1203|
| 1| 745|
+-------+--------+
y = user_recs_one.select("user_id",
F.explode(col("rating")).alias("rating"))
y.show()
+-------+---------+
|user_id| rating|
+-------+---------+
| 1|6.1223927|
| 1|6.0752907|
| 1| 5.954721|
+-------+---------+
x.join(y, on='user_id', how='inner').show()
+-------+--------+---------+
|user_id|movie_id| rating|
+-------+--------+---------+
| 1| 1085|6.1223927|
| 1| 1085|6.0752907|
| 1| 1085| 5.954721|
| 1| 1203|6.1223927|
| 1| 1203|6.0752907|
| 1| 1203| 5.954721|
| 1| 745|6.1223927|
| 1| 745|6.0752907|
| 1| 745| 5.954721|
+-------+--------+---------+
Upvotes: 0
Views: 615
Reputation: 21
Since my result set was very small, this is what I ended up implementing:
user_recs_one = user_recs_one.select("user_id", "recommendations.movie_id", "recommendations.rating")
user_recs_one.show(truncate=False)
+-------+-----------------+--------------------------------+
|user_id|movie_id |rating |
+-------+-----------------+--------------------------------+
|1 |[1085, 1203, 745]|[6.1223927, 6.0752907, 5.954721]|
+-------+-----------------+--------------------------------+
user_recs_one
DataFrame[user_id: int, movie_id: array<int>, rating: array<float>]
Introduce a Sequence Id:
In order to join the recommended movies and recommended ratings we need to introduce an additional id column. In order to ensure that the values in the id column is increasing we use the monotonically_increasing_id() function. This function is guaranteed to produce increasing numbers but not guaranteed to produce sequential increasing numbers if there are more than 1 partition in the dataframe. So we also repartition the exploded dataframe into 1 partition.
only_movies = user_recs_one.select("user_id", F.explode(col("movie_id")).alias("movie_id"))
only_movies = only_movies.repartition(1).withColumn('id', F.monotonically_increasing_id())
only_movies = only_movies.select('id', 'user_id', 'movie_id')
only_movies.show()
+---+-------+--------+
| id|user_id|movie_id|
+---+-------+--------+
| 0| 1| 1085|
| 1| 1| 1203|
| 2| 1| 745|
+---+-------+--------+
only_ratings = user_recs_one.select("user_id", F.explode(col("rating")).alias("rating"))
only_ratings = only_ratings.repartition(1).withColumn('id', F.monotonically_increasing_id())
only_ratings = only_ratings.select('id', 'user_id', 'rating')
only_ratings.show()
+---+-------+---------+
| id|user_id| rating|
+---+-------+---------+
| 0| 1|6.1223927|
| 1| 1|6.0752907|
| 2| 1| 5.954721|
+---+-------+---------+
only_movies.join(only_ratings.drop('user_id'), on='id', how='inner').drop('id').show()
+-------+--------+---------+
|user_id|movie_id| rating|
+-------+--------+---------+
| 1| 1085|6.1223927|
| 1| 1203|6.0752907|
| 1| 745| 5.954721|
+-------+--------+---------+
Upvotes: 1