Reputation: 492
I calculated a correlation matrix in spark and I want to extract single correlations in combination with their column names.
Correlation Matrix
correlMatrix: org.apache.spark.mllib.linalg.Matrix =
1.0 -0.33333333333333254 -0.8164965809277261 -0.7777777777777787
-0.33333333333333254 1.0 0.8164965809277356 -0.33333333333333254
-0.8164965809277261 0.8164965809277356 1.0 0.27216552697591645
-0.7777777777777787 -0.33333333333333254 0.27216552697591645 1.0
Datafarme Names
colNames: Array[String] = Array(item_1, item_2, item_3, item_4)
Now I want to extract each combination into dataframe with the following structure:
item_from | item_to | Correlation
item_1 | item_2 | -0.0096912
item_1 | item_3 | -0.7313071
item_2 | item_3 | 0.68910356
Or at least the whole correlation matrix with column names:
item_1 item_2 item_3 item_4
item_1 1.0 -0.33333333333333254 -0.8164965809277261 -0.7777777777777787
item_2 -0.33333333333333254 1.0 0.8164965809277356 -0.33333333333333254
item_3 -0.8164965809277261 0.8164965809277356 1.0 0.27216552697591645
item_4 -0.7777777777777787 -0.33333333333333254 0.27216552697591645 1.0
I've tried to write a map function but it didn't work as I expected.
Is there any solution you could suggest?
Upvotes: 1
Views: 794
Reputation: 1043
val colNamePairs = colsNames.flatMap(c1 => colsNames.map(c2 => (c1, c2)))
val triplesList = colNamePairs.zip(correlMatrix.toArray)
.filterNot(p => p._1._1 >= p._1._2)
.map(r => (r._1._1, r._1._2, r._2))
val corrDF = sc.parallelize(triplesList).toDF("item_from", "item_to", "Correlation")
colNamePairs produces all the combinations of column names triplesList represents the list of triples composed by (colName1, colName2, correlation)
Finally, we convert it to a DF with the sought column names.
Please note the filterNot is optional, there only to keep half of the matrix (excluding the diagonal) as it is symmetric and thus redundant, if you want the full list just remove it.
Upvotes: 2