Duesentrieb
Duesentrieb

Reputation: 492

Convert Spark Data Frame to org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]

I'm very new to scala and spark 2.1. I'm trying to calculate correlation between many elements in a data frame which looks like this:

item_1 | item_2 | item_3 | item_4
     1 |      1 |      4 |      3
     2 |      0 |      2 |      0
     0 |      2 |      0 |      1

Here is what I've tried:

val df = sqlContext.createDataFrame(
  Seq((1, 1, 4, 3),
      (2, 0, 2, 0),
      (0, 2, 0, 1)
).toDF("item_1", "item_2", "item_3", "item_4")


val items = df.select(array(df.columns.map(col(_)): _*)).rdd.map(_.getSeq[Double](0))

And calcualte correlation between elements:

val correlMatrix: Matrix = Statistics.corr(items, "pearson")

With followning error message:

<console>:89: error: type mismatch;
found   : org.apache.spark.rdd.RDD[Seq[Double]]
 required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
       val correlMatrix: Matrix = Statistics.corr(items, "pearson")

I don't know how to create the org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] from a data frame.

This might be a really easy task but I kinda struggle with it and I'm happy for any advice.

Upvotes: 1

Views: 2675

Answers (2)

JamCon
JamCon

Reputation: 2333

If your goal is to perform pearson correlations, you don't really have to use RDDs and Vectors. Here's an example of performing pearson correlations directly on DataFrame columns (the columns in question are Doubles types).

Code:

import org.apache.spark.sql.{SQLContext, Row, DataFrame}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}
import org.apache.spark.sql.functions._


val rb = spark.read.option("delimiter","|").option("header","false").option("inferSchema","true").format("csv").load("rb.csv").toDF("name","beerId","brewerId","abv","style","appearance","aroma","palate","taste","overall","time","reviewer").cache()

rb.agg(
    corr("overall","taste"),
    corr("overall","aroma"),
    corr("overall","palate"),
    corr("overall","appearance"),
    corr("overall","abv")
    ).show()

In this example, I'm importing a dataframe (with a custom delimiter, no header, and inferred data types), and then simply performing an agg function against the dataframe which has multiple correlations inside it.



Output:

+--------------------+--------------------+---------------------+-------------------------+------------------+
|corr(overall, taste)|corr(overall, aroma)|corr(overall, palate)|corr(overall, appearance)|corr(overall, abv)|
+--------------------+--------------------+---------------------+-------------------------+------------------+
|  0.8762432795943761|   0.789023067942876|   0.7008942639550395|       0.5663593891357243|0.3539158620897098|
+--------------------+--------------------+---------------------+-------------------------+------------------+

As you can see from the results, the (overall, taste) columns are highly correlated, while (overall, abv) not so much.

Here's a link to the Scala Docs DataFrame page which has the Aggregation Correlation Function.

Upvotes: 2

zero323
zero323

Reputation: 330413

You can for example use VectorAssembler. Assemble vectors and convert to RDD

import org.apache.spark.ml.feature.VectorAssembler

val rows = new VectorAssembler().setInputCols(df.columns).setOutputCol("vs")
  .transform(df)
  .select("vs")
  .rdd

Extract Vectors from Row:

  • Spark 1.x:

    rows.map(_.getAs[org.apache.spark.mllib.linalg.Vector](0))
    
  • Spark 2.x:

    rows
      .map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
      .map(org.apache.spark.mllib.linalg.Vectors.fromML)
    

Regarding your code:

  • You have Integer columns not Double.
  • Data is not an array so the you cannot use _.getSeq[Double](0).

Upvotes: 6

Related Questions