Reputation: 2132
In the ALS example given in PySpark as per this documentation - http://spark.apache.org/docs/latest/ml-collaborative-filtering.html) the data used has explicit feedback in one column. The data is like this: | User | Item | Rating | | --- | --- | --- | | First | A | 2 | | Second | B | 3|
However, in my case I have implicit feedbacks in multiple columns like this: | User | Item | Clicks | Views | Purchase | | --- | --- | --- | --- | --- | | First | A | 20 | 35 | 3 | | Second | B | 3| 12 | 0 |
I know we can use implicit feedback by setting implicitPrefs
as False
. However, it only accepts a single column. How to use multiple columns?
I found this question: How to manage multiple positive implicit feedbacks? However, it is not related with Spark and Alternating Least Square method. Do I have to manually assign a weighting scheme as per that answer? or is there a better solution in PySpark?
Upvotes: 0
Views: 340
Reputation: 342
I have thoroughly Researched your issue, i haven't found passing multiple columns in ALS, most of the such problems are being solved by manually weighing and creating Rating column.
Below is my solution
Extract Smallest value (except 0) and devide all ements for same colmn by it
example : min value for Purchase col is 3
so 3/3, 10/3, 20/3 .. etc
Below is the formula for Rating
Rating = 60% of Purchase + 30% of Clicks + 10% of Views
data.show()
+------+----+------+-----+--------+
| User|Item|Clicks|Views|Purchase|
+------+----+------+-----+--------+
| First| A| 20| 35| 3|
|Second| B| 3| 12| 0|
| Three| C| 4| 15| 20|
| Four| D| 5| 16| 10|
+------+----+------+-----+--------+
df1 = data.sort('Purchase').select('Purchase')
df= df1.filter(df1.Purchase >0)
purch_index = df.first()['Purchase']
df2 = data.sort('Views').select('Views')
df2= df2.filter(df2.Views >0)
Views_index = df2.first()['Views']
f3 = data.sort('Clicks').select('Clicks')
df3= df3.filter(df3.Clicks >0)
CLicks_index = df3.first()['Clicks']
semi_rawdf = data.withColumn('Clicks',round(col('Clicks')/CLicks_index)).withColumn('Views',round(col('Views')/Views_index)).withColumn('Purchase',round(col('Purchase')/purch_index))
semi_rawdf.show()
+------+----+------+-----+--------+
| User|Item|Clicks|Views|Purchase|
+------+----+------+-----+--------+
| First| A| 7.0| 3.0| 1.0|
|Second| B| 1.0| 1.0| 0.0|
| Three| C| 1.0| 1.0| 7.0|
| Four| D| 2.0| 1.0| 3.0|
+------+----+------+-----+--------+
from pyspark.sql.types import DecimalType
from decimal import Decimal
refined_df = semi_rawdf.withColumn('Rating',((col('Clicks')*0.3)+round(col('Views')*0.1)+round(col('Purchase')*0.6)))
refined_df = refined_df.withColumn('Rating', col('Rating').cast(DecimalType(6,2)))
refined_df.select('User','Item','Rating').show()
+------+----+------+
| User|Item|Rating|
+------+----+------+
| First| A| 3.10|
|Second| B| 0.30|
| Three| C| 4.30|
| Four| D| 2.60|
+------+----+------+
Upvotes: 1