Michal
Michal

Reputation: 1905

Creating a Random Feature Array in Spark DataFrames

When creating an ALS model, we can extract a userFactors DataFrame and an itemFactors DataFrame. These DataFrames contain a column with an Array.

I would like to generate some random data and union it to the userFactors DataFrame.

Here is my code:

 val df1: DataFrame  = Seq((123, 456, 4.0), (123, 789, 5.0), (234, 456, 4.5), (234, 789, 1.0)).toDF("user", "item", "rating")
val model1 = (new ALS()
 .setImplicitPrefs(true)
 .fit(df1))

val iF = model1.itemFactors
val uF = model1.userFactors

I then create a random DataFrame using a VectorAssembler with this function:

def makeNew(df: DataFrame, rank: Int): DataFrame = {
    var df_dummy = df
    var i: Int = 0
    var inputCols: Array[String] = Array()
    for (i <- 0 to rank) {
       df_dummy = df_dummy.withColumn("feature".concat(i.toString), rand())
       inputCols = inputCols :+ "feature".concat(i.toString)
      }
    val assembler = new VectorAssembler()
      .setInputCols(inputCols)
      .setOutputCol("userFeatures")
    val output = assembler.transform(df_dummy)
    output.select("user", "userFeatures")
  }

I then create the DataFrame with new user IDs and add the random vectors and bias:

val usersDf: DataFrame = Seq(567), (678)).toDF("user")
var usersFactorsNew: DataFrame = makeNew(usersDf, 20)

The problem arises when I union the two DataFrames.

usersFactorsNew.union(uF) produces the error:

 org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<type:tinyint,size:int,indices:array<int>,values:array<double>> <> array<float> at the second column of the second table;;

If I print the schema, the uF DataFrame has a feature vector of type Array[Float] and the usersFactorsNew DataFrame as a feature vector of type Vector.

My question is how to change the type of the Vector to an Array in order to perform the union.

I tried writing this udf with little success:

val toArr: org.apache.spark.ml.linalg.Vector => Array[Double] = _.toArray
val toArrUdf = udf(toArr)

Perhaps the VectorAssembler is not the best option for this task. However, at the moment, it is the only option I have found. I would love to get some recommendations for something better.

Upvotes: 1

Views: 1510

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

All of your process are all correct. Even the udf function is working successfully. All you need to do is change the last part of makeNew function as

def makeNew(df: DataFrame, rank: Int): DataFrame = {
  var df_dummy = df
  var i: Int = 0
  var inputCols: Array[String] = Array()
  for (i <- 0 to rank) {
    df_dummy = df_dummy.withColumn("feature".concat(i.toString), rand())
    inputCols = inputCols :+ "feature".concat(i.toString)
  }
  val assembler = new VectorAssembler()
    .setInputCols(inputCols)
    .setOutputCol("userFeatures")
  val output = assembler.transform(df_dummy)
  output.select(col("id"), toArrUdf(col("userFeatures")).as("features"))
}

and you should be perfect to go so that when you do (I created userDf with id column and not user column)

val usersDf: DataFrame = Seq((567), (678)).toDF("id")
var usersFactorsNew: DataFrame = makeNew(usersDf, 20)
usersFactorsNew.union(uF).show(false)

you should be getting

+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|567|[0.8259185719733708, 0.327713892339658, 0.049547223031371046, 0.056661808506210054, 0.5846626163454274, 0.038497936270104005, 0.8970865088803417, 0.8840660648882804, 0.837866669938156, 0.9395263094918058, 0.09179528484355126, 0.4915430644129799, 0.11083447052043116, 0.5122858182953718, 0.4302683812966408, 0.3862741815833828, 0.6189322403095068, 0.3000371006293433, 0.09331299668168902, 0.7421838728601371, 0.855867963988993]|
|678|[0.7686514248005568, 0.5473580740023187, 0.072945344124282, 0.36648594574355287, 0.9780202082328863, 0.5289221651923784, 0.3719451099963028, 0.2824660794505932, 0.4873197501260199, 0.9364676464120849, 0.011539929543513794, 0.5240615794930654, 0.6282546154521298, 0.995256022569878, 0.6659179561266975, 0.8990775317754092, 0.08650071017556926, 0.5190186149992805, 0.056345335742325475, 0.6465357505620791, 0.17913532817943245] |
|123|[0.04177388548851013, 0.26762014627456665, -0.19617630541324615, 0.34298020601272583, 0.19632814824581146, -0.2748605012893677, 0.07724890112876892, 0.4277132749557495, 0.1927199512720108, -0.40271613001823425]                                                                                                                                                                                                                        |
|234|[0.04139673709869385, 0.26520395278930664, -0.19440513849258423, 0.3398836553096771, 0.1945556253194809, -0.27237895131111145, 0.07655145972967148, 0.42385169863700867, 0.19098000228405, -0.39908021688461304]                                                                                                                                                                                                                          |
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Upvotes: 1

Shaido
Shaido

Reputation: 28352

Instead of creating a dummy dataframe and using VectorAssembler to generate a random feature vector, you can simply use an UDF directly. The userFactors from the ALS model will return an Array[Float] so the output from the UDF should match that.

val createRandomArray = udf((rank: Int) => {
  Array.fill(rank)(Random.nextFloat)
})

Note that this will give numbers in the interval [0.0, 1.0] (same as the rand() used in the question), if other numbers are required, modify as fit.

Using a rank of 3 and the userDf:

val usersFactorsNew = usersDf.withColumn("userFeatures", createRandomArray(lit(3)))

will give a dataframe as follows (of course with random feature values)

+----+----------------------------------------------------------+
|user|userFeatures                                              |
+----+----------------------------------------------------------+
|567 |[0.6866711267486822,0.7257031656127676,0.983562255688249] |
|678 |[0.7013908820314967,0.41029552817665327,0.554591149586789]|
+----+----------------------------------------------------------+

Joining this dataframe with the uF dataframe should now be possible.


The reason the UDF did not work should be due to it being an Array[Double] while you need anArray[Float]for theunion. It should be possible to fix with amap(_.toFloat)`.

val toArr: org.apache.spark.ml.linalg.Vector => Array[Float] = _.toArray.map(_.toFloat)
val toArrUdf = udf(toArr)

Upvotes: 1

Related Questions