Duesentrieb
Duesentrieb

Reputation: 492

columnSimilarities() back to Spark Data Frame

I have a second question around CosineSimilarity / ColumnSimilarities in Spark 2.1. I'm kinda new to scala and all the Spark environment and this is not really clear to me:

How can I get back the ColumnSimilarities for each combination of columns from the rowMatrix in spark. Here is what I tried:

Data:

import org.apache.spark.sql.{SQLContext, Row, DataFrame}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}
import org.apache.spark.sql.functions._

// rdd
    val rowsRdd: RDD[Row] = sc.parallelize(
      Seq(
        Row(2.0, 7.0, 1.0),
        Row(3.5, 2.5, 0.0),
        Row(7.0, 5.9, 0.0)
      )
    )

// Schema  
    val schema = new StructType()
      .add(StructField("item_1", DoubleType, true))
      .add(StructField("item_2", DoubleType, true))
      .add(StructField("item_3", DoubleType, true))

// Data frame  
    val df = spark.createDataFrame(rowsRdd, schema) 

Code:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix, RowMatrix}

val rows = new VectorAssembler().setInputCols(df.columns).setOutputCol("vs")
  .transform(df)
  .select("vs")
  .rdd

val items_mllib_vector = rows.map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
                             .map(org.apache.spark.mllib.linalg.Vectors.fromML)
val mat = new RowMatrix(items_mllib_vector)
val simsPerfect = mat.columnSimilarities()


println("Pairwise similarities are: " +   simsPerfect.entries.collect.mkString(", "))

Output:

Pairwise similarities are: MatrixEntry(0,2,0.24759378423606918), MatrixEntry(1,2,0.7376189553526812), MatrixEntry(0,1,0.8355316482961213)

So What I get is simsPerfect org.apache.spark.mllib.linalg.distributed.CoordinateMatrix of my Columns and similarities. How would I transform this back to a dataframe and get the right columns names with it?

My preferred output:

    item_from | item_to | similarity
            1 |       2 |      0.83 |             
            1 |       3 |      0.24 |
            2 |       3 |      0.73 | 

Thanks in advance

Upvotes: 2

Views: 2701

Answers (2)

BlueFeet
BlueFeet

Reputation: 2507

This approach also works without converting the row to String:

val transformedRDD = simsPerfect.entries.map{case MatrixEntry(row: Long, col:Long, sim:Double) => (row,col,sim)}
val dff = sqlContext.createDataFrame(transformedRDD).toDF("item_from", "item_to", "sim")

where, I assume val sqlContext = new org.apache.spark.sql.SQLContext(sc) is defined already and sc is the SparkContext.

Upvotes: 4

Duesentrieb
Duesentrieb

Reputation: 492

I found a solution for my problem:

//Transform result to rdd
val transformedRDD = simsPerfect.entries.map{case MatrixEntry(row: Long, col:Long, sim:Double) => Array(row,col,sim).mkString(",")}

//Transform rdd[String] to rdd[Row]
val rdd2 = transformedRDD.map(a => Row(a))

// to DF
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = spark.createDataFrame(rdd2,dfschema) 

//create new DF with schema
val newdf = rddToDF.select(expr("(split(value, ','))[0]").cast("string").as("item_from")
              ,expr("(split(value, ','))[1]").cast("string").as("item_to")
              ,expr("(split(value, ','))[2]").cast("string").as("sim"))

I'm sure there is another easier way to do this, but I'm happy that it works.

Upvotes: 0

Related Questions