Reputation: 492
I'm very new to scala and spark 2.1. I'm trying to calculate correlation between many elements in a data frame which looks like this:
item_1 | item_2 | item_3 | item_4
1 | 1 | 4 | 3
2 | 0 | 2 | 0
0 | 2 | 0 | 1
Here is what I've tried:
val df = sqlContext.createDataFrame(
Seq((1, 1, 4, 3),
(2, 0, 2, 0),
(0, 2, 0, 1)
).toDF("item_1", "item_2", "item_3", "item_4")
val items = df.select(array(df.columns.map(col(_)): _*)).rdd.map(_.getSeq[Double](0))
And calcualte correlation between elements:
val correlMatrix: Matrix = Statistics.corr(items, "pearson")
With followning error message:
<console>:89: error: type mismatch;
found : org.apache.spark.rdd.RDD[Seq[Double]]
required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
val correlMatrix: Matrix = Statistics.corr(items, "pearson")
I don't know how to create the org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
from a data frame.
This might be a really easy task but I kinda struggle with it and I'm happy for any advice.
Upvotes: 1
Views: 2675
Reputation: 2333
If your goal is to perform pearson correlations, you don't really have to use RDDs and Vectors. Here's an example of performing pearson correlations directly on DataFrame columns (the columns in question are Doubles types).
Code:
import org.apache.spark.sql.{SQLContext, Row, DataFrame}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}
import org.apache.spark.sql.functions._
val rb = spark.read.option("delimiter","|").option("header","false").option("inferSchema","true").format("csv").load("rb.csv").toDF("name","beerId","brewerId","abv","style","appearance","aroma","palate","taste","overall","time","reviewer").cache()
rb.agg(
corr("overall","taste"),
corr("overall","aroma"),
corr("overall","palate"),
corr("overall","appearance"),
corr("overall","abv")
).show()
In this example, I'm importing a dataframe (with a custom delimiter, no header, and inferred data types), and then simply performing an agg function against the dataframe which has multiple correlations inside it.
Output:
+--------------------+--------------------+---------------------+-------------------------+------------------+
|corr(overall, taste)|corr(overall, aroma)|corr(overall, palate)|corr(overall, appearance)|corr(overall, abv)|
+--------------------+--------------------+---------------------+-------------------------+------------------+
| 0.8762432795943761| 0.789023067942876| 0.7008942639550395| 0.5663593891357243|0.3539158620897098|
+--------------------+--------------------+---------------------+-------------------------+------------------+
As you can see from the results, the (overall, taste) columns are highly correlated, while (overall, abv) not so much.
Here's a link to the Scala Docs DataFrame page which has the Aggregation Correlation Function.
Upvotes: 2
Reputation: 330413
You can for example use VectorAssembler
. Assemble vectors and convert to RDD
import org.apache.spark.ml.feature.VectorAssembler
val rows = new VectorAssembler().setInputCols(df.columns).setOutputCol("vs")
.transform(df)
.select("vs")
.rdd
Extract Vectors
from Row
:
Spark 1.x:
rows.map(_.getAs[org.apache.spark.mllib.linalg.Vector](0))
Spark 2.x:
rows
.map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
.map(org.apache.spark.mllib.linalg.Vectors.fromML)
Regarding your code:
Integer
columns not Double
.array
so the you cannot use _.getSeq[Double](0)
.Upvotes: 6