Reputation: 2767
New to scala
(pyspark
guy) and trying to calculated cosine similarity between rows (items)
Followed this to create a sample df as an example:
Spark, Scala, DataFrame: create feature vectors
import org.apache.spark.ml.feature.VectorAssembler
val df = sc.parallelize(Seq(
(1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), (2, "cat4", 6),
(2, "cat9", 2), (2, "cat10", 1), (3, "cat1", 5), (3, "cat7", 16),
(3, "cat8", 2))).toDF("userID", "category", "frequency")
// Create a sorted array of categories
val categories = df
.select($"category")
.distinct.map(_.getString(0))
.collect
.sorted
// Prepare vector assemble
val assembler = new VectorAssembler()
.setInputCols(categories)
.setOutputCol("features")
// Aggregation expressions
val exprs = categories.map(
c => sum(when($"category" === c, $"frequency").otherwise(lit(0))).alias(c))
val transformed = assembler.transform(
df.groupBy($"userID").agg(exprs.head, exprs.tail: _*))
.select($"userID", $"features")
transformed.show
+------+--------------------+
|userID| features|
+------+--------------------+
| 1|(7,[0,2,6],[1.0,3...|
| 3|(7,[0,4,5],[5.0,1...|
| 2|(7,[1,3,6],[1.0,6...|
+------+--------------------+
Trying to follow this post to convert the df into the IndexedRowMatrix
and having trouble with the scala
syntax on how to map the rdd
properly
Calculate Cosine Similarity Spark Dataframe
import org.apache.spark.sql.Row
val irm = new IndexedRowMatrix(transformed.rdd.map {
Row(_, v: org.apache.spark.ml.linalg.Vector) =>
org.apache.spark.mllib.linalg.Vectors.fromML(v)
}.zipWithIndex.map { case (v, i) => IndexedRow(i, v) })
<console>:5: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function parameters.
Either create a single parameter accepting the Tuple1,
or consider a pattern matching anonymous function: `{ case (param1, param1) => ... }
Row(_, v: org.apache.spark.ml.linalg.Vector) =>
^
Thanks!
Upvotes: 0
Views: 2208
Reputation: 1574
Try this one, with RowMatrix :
def convertDataFrameToRowMatrix(df:DataFrame):RowMatrix = {
val rows = df.count()
val cols = df.columns.length
val rdd:RDD[org.apache.spark.mllib.linalg.Vector] = df.rdd.map(
row => org.apache.spark.mllib.linalg.Vectors.dense(row.getAs[Seq[Double]](1).toArray)
val row = new IndexedRowMatrix(rdd,rows,cols)
row
}
And with IndexedRowMatrix :
def convertDataFrameToIndexedMatrix(df:DataFrame):IndexedRowMatrix = {
val rows:Long = df.count()
val cols = df.columns.length
val rdd = df.rdd.map(
row => IndexedRow(rows, org.apache.spark.mllib.linalg.Vectors.dense(row.getAs[Seq[Double]](1).toArray)))
val row = new IndexedRowMatrix(rdd,rows,cols)
row
}
If you want to convert an IndexedRowMatrix or RowMatrix to RDD, that's simple:
def convertIndexedRowMatrixToRDD(irm:IndexedRowMatrix):RDD[IndexedRow]=irm.rows
def convertRowMatrixToRDD(rm:RowMatrix):RDD[org.apache.spark.mllib.linalg.Vector] =rm.rows
If you want to convert it to DataFrame, check this link.
As an example of running the function:
val si = Seq((1,2), (3,4))
val myrdd:RDD[IndexedRow] = sc.parallelize(si).map(x => new IndexedRow(x._1.asInstanceOf[Long] ,Vectors.dense(x._1, x._2)))
val irm:IndexedRowMatrix = new IndexedRowMatrix(myrdd)
val r = convertIndexedRowMatrixToRDD(sc,irm)
val t = r.foreach(println)
Output:
IndexedRow(3,[3.0,4.0])
IndexedRow(1,[1.0,2.0])
Upvotes: 2