Reputation: 1905
When creating an ALS
model, we can extract a userFactors
DataFrame and an itemFactors
DataFrame. These DataFrames contain a column with an Array.
I would like to generate some random data and union it to the userFactors
DataFrame.
Here is my code:
val df1: DataFrame = Seq((123, 456, 4.0), (123, 789, 5.0), (234, 456, 4.5), (234, 789, 1.0)).toDF("user", "item", "rating")
val model1 = (new ALS()
.setImplicitPrefs(true)
.fit(df1))
val iF = model1.itemFactors
val uF = model1.userFactors
I then create a random DataFrame using a VectorAssembler
with this function:
def makeNew(df: DataFrame, rank: Int): DataFrame = {
var df_dummy = df
var i: Int = 0
var inputCols: Array[String] = Array()
for (i <- 0 to rank) {
df_dummy = df_dummy.withColumn("feature".concat(i.toString), rand())
inputCols = inputCols :+ "feature".concat(i.toString)
}
val assembler = new VectorAssembler()
.setInputCols(inputCols)
.setOutputCol("userFeatures")
val output = assembler.transform(df_dummy)
output.select("user", "userFeatures")
}
I then create the DataFrame with new user IDs and add the random vectors and bias:
val usersDf: DataFrame = Seq(567), (678)).toDF("user")
var usersFactorsNew: DataFrame = makeNew(usersDf, 20)
The problem arises when I union the two DataFrames.
usersFactorsNew.union(uF)
produces the error:
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<type:tinyint,size:int,indices:array<int>,values:array<double>> <> array<float> at the second column of the second table;;
If I print the schema, the uF
DataFrame has a feature vector of type Array[Float]
and the usersFactorsNew
DataFrame as a feature vector of type Vector
.
My question is how to change the type of the Vector
to an Array in order to perform the union.
I tried writing this udf
with little success:
val toArr: org.apache.spark.ml.linalg.Vector => Array[Double] = _.toArray
val toArrUdf = udf(toArr)
Perhaps the VectorAssembler
is not the best option for this task. However, at the moment, it is the only option I have found. I would love to get some recommendations for something better.
Upvotes: 1
Views: 1510
Reputation: 41957
All of your process are all correct. Even the udf
function is working successfully. All you need to do is change the last part of makeNew
function as
def makeNew(df: DataFrame, rank: Int): DataFrame = {
var df_dummy = df
var i: Int = 0
var inputCols: Array[String] = Array()
for (i <- 0 to rank) {
df_dummy = df_dummy.withColumn("feature".concat(i.toString), rand())
inputCols = inputCols :+ "feature".concat(i.toString)
}
val assembler = new VectorAssembler()
.setInputCols(inputCols)
.setOutputCol("userFeatures")
val output = assembler.transform(df_dummy)
output.select(col("id"), toArrUdf(col("userFeatures")).as("features"))
}
and you should be perfect to go so that when you do (I created userDf with id column and not user column)
val usersDf: DataFrame = Seq((567), (678)).toDF("id")
var usersFactorsNew: DataFrame = makeNew(usersDf, 20)
usersFactorsNew.union(uF).show(false)
you should be getting
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |features |
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|567|[0.8259185719733708, 0.327713892339658, 0.049547223031371046, 0.056661808506210054, 0.5846626163454274, 0.038497936270104005, 0.8970865088803417, 0.8840660648882804, 0.837866669938156, 0.9395263094918058, 0.09179528484355126, 0.4915430644129799, 0.11083447052043116, 0.5122858182953718, 0.4302683812966408, 0.3862741815833828, 0.6189322403095068, 0.3000371006293433, 0.09331299668168902, 0.7421838728601371, 0.855867963988993]|
|678|[0.7686514248005568, 0.5473580740023187, 0.072945344124282, 0.36648594574355287, 0.9780202082328863, 0.5289221651923784, 0.3719451099963028, 0.2824660794505932, 0.4873197501260199, 0.9364676464120849, 0.011539929543513794, 0.5240615794930654, 0.6282546154521298, 0.995256022569878, 0.6659179561266975, 0.8990775317754092, 0.08650071017556926, 0.5190186149992805, 0.056345335742325475, 0.6465357505620791, 0.17913532817943245] |
|123|[0.04177388548851013, 0.26762014627456665, -0.19617630541324615, 0.34298020601272583, 0.19632814824581146, -0.2748605012893677, 0.07724890112876892, 0.4277132749557495, 0.1927199512720108, -0.40271613001823425] |
|234|[0.04139673709869385, 0.26520395278930664, -0.19440513849258423, 0.3398836553096771, 0.1945556253194809, -0.27237895131111145, 0.07655145972967148, 0.42385169863700867, 0.19098000228405, -0.39908021688461304] |
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Upvotes: 1
Reputation: 28352
Instead of creating a dummy dataframe and using VectorAssembler
to generate a random feature vector, you can simply use an UDF
directly. The userFactors
from the ALS
model will return an Array[Float]
so the output from the UDF
should match that.
val createRandomArray = udf((rank: Int) => {
Array.fill(rank)(Random.nextFloat)
})
Note that this will give numbers in the interval [0.0, 1.0] (same as the rand()
used in the question), if other numbers are required, modify as fit.
Using a rank of 3 and the userDf
:
val usersFactorsNew = usersDf.withColumn("userFeatures", createRandomArray(lit(3)))
will give a dataframe as follows (of course with random feature values)
+----+----------------------------------------------------------+
|user|userFeatures |
+----+----------------------------------------------------------+
|567 |[0.6866711267486822,0.7257031656127676,0.983562255688249] |
|678 |[0.7013908820314967,0.41029552817665327,0.554591149586789]|
+----+----------------------------------------------------------+
Joining this dataframe with the uF
dataframe should now be possible.
The reason the UDF
did not work should be due to it being an Array[Double] while you need an
Array[Float]for the
union. It should be possible to fix with a
map(_.toFloat)`.
val toArr: org.apache.spark.ml.linalg.Vector => Array[Float] = _.toArray.map(_.toFloat)
val toArrUdf = udf(toArr)
Upvotes: 1