TTuPC
TTuPC

Reputation: 43

Calculate cosine similarity with a dataframe Scala Spark

I have a dataframe in this form:

+-------+-------+------------------+-------+----+
|userId1|movieId|              rat1|userId2|rat2|
+-------+-------+------------------+-------+----+
|      1|      1|               1.0|      2| 1.0|
|      1|      2|               1.0|      2| 2.0|
|      1|      3|               2.0|      2| 3.0|
|      2|      1|               1.0|      3| 0.0|
|      2|      2|               2.0|      3| 0.0|
|      2|      3|               3.0|      3| 0.0|
|      3|      1|               0.0|      1| 1.0|
|      3|      2|               0.0|      1| 1.0|

....

Where rat1 and rat2 are the rating of user1 and user2. What I want is to calculate the cosine similarity beetwen two user, my idea is to extract arrays from this dataframe and after calculate the cosine similarity, for example:

arrayUser1 = (1,1,2)
arrayUser2 = (1,2,3)
arrayUser3 = (0,0,0)

The problem is that I don't know how to extract these arrays, someone have a solution? Or tips for calculate the similarity in a better way?

Upvotes: 0

Views: 311

Answers (2)

mck
mck

Reputation: 42332

You can multiply rat1 and rat2 first, then group by userId1 and userId2, and sum up the product:

df.show
+-------+-------+----+-------+----+
|userId1|movieId|rat1|userId2|rat2|
+-------+-------+----+-------+----+
|      1|      1| 1.0|      2| 1.0|
|      1|      2| 1.0|      2| 2.0|
|      1|      3| 2.0|      2| 3.0|
|      2|      1| 1.0|      3| 0.0|
|      2|      2| 2.0|      3| 0.0|
|      2|      3| 3.0|      3| 0.0|
|      3|      1| 0.0|      1| 1.0|
|      3|      2| 0.0|      1| 1.0|
|      3|      3| 0.0|      1| 2.0|
+-------+-------+----+-------+----+
val cos_sim = df.withColumn(
    "rat1",    // normalize rat1
    coalesce(
        $"rat1" / sqrt(sum($"rat1" * $"rat1").over(Window.partitionBy("userId1"))),
        lit(0)
    )
).withColumn(
    "rat2",    // normalize rat2
    coalesce(
        $"rat2" / sqrt(sum($"rat2" * $"rat2").over(Window.partitionBy("userId2"))),
        lit(0)
    )
).withColumn(
    "rat1_times_rat2",
    $"rat1" * $"rat2"
).groupBy("userId1", "userId2").agg(sum("rat1_times_rat2").alias("cos_sim"))

cos_sim.show
+-------+-------+-----------------+
|userId1|userId2|          cos_sim|
+-------+-------+-----------------+
|      3|      1|              0.0|
|      2|      3|              0.0|
|      1|      2|0.981980506061966|
+-------+-------+-----------------+

Upvotes: 1

SooryaS
SooryaS

Reputation: 42

You can use dataframe groupBy operation and do a collect_set aggregation

Below is the sample code.

scala> someDF.show
+-------+-------+----+-------+----+
|userId1|movieId|rat1|userId2|rat2|
+-------+-------+----+-------+----+
|      1|      1| 1.0|      2| 1.0|
|      1|      2| 1.0|      2| 2.0|
|      1|      3| 2.0|      2| 3.0|
|      2|      1| 1.0|      3| 0.0|
|      2|      2| 2.0|      3| 0.0|
|      2|      3| 3.0|      3| 0.0|
|      3|      1| 0.0|      1| 1.0|
|      3|      2| 0.0|      1| 1.0|
+-------+-------+----+-------+----+

scala> someDF.groupBy("userId1").agg(collect_set("rat1").alias("ratinglist")).show
+-------+---------------+
|userId1|     ratinglist|
+-------+---------------+
|      1|     [2.0, 1.0]|
|      3|          [0.0]|
|      2|[2.0, 1.0, 3.0]|
+-------+---------------+

Upvotes: 0

Related Questions