Reputation: 22234
Please help finding the ways to create a distributed matrix from the (user, feature, value) records in a DataFrame where features and their values are stored in a column.
Excerpts of the data is below but there are large number of users and features, and no all features are tested for users. Hence lots of feature values are null and to be imputed to 0.
For instance, a blood test may have sugar level, cholesterol level, etc as features. If those levels are not acceptable, then 1 is set as the value. But not all the features will be tested for the users (or patients).
+----+-------+-----+
|user|feature|value|
+----+-------+-----+
| 14| 0| 1|
| 14| 222| 1|
| 14| 200| 1|
| 22| 0| 1|
| 22| 32| 1|
| 22| 147| 1|
| 22| 279| 1|
| 22| 330| 1|
| 22| 363| 1|
| 22| 162| 1|
| 22| 811| 1|
| 22| 290| 1|
| 22| 335| 1|
| 22| 681| 1|
| 22| 786| 1|
| 22| 789| 1|
| 22| 842| 1|
| 22| 856| 1|
| 22| 881| 1|
+----+-------+-----+
If features are alredy columns, then there are ways explained.
But this is not the case. So one way could be pivoting the dataframe to apply those methods.
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user| 0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 14| 1| 0| 0| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 22| 1| 1| 1| 1| 0| 0| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
Then use row to vector conversion. I suppose using one of these:
However, since there will be many null values to be imputed to 0, the pivoted dataframe will consume far more memory space. Also pivoting a large dataframe distributed among multiple nodes would be causing large shuffling.
Hence, seek for advices, ideas, suggestions.
Spark 2.4.4
Upvotes: 5
Views: 3738
Reputation: 22234
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user| 0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 14| 1| 0| 0| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
For product operation, convert RowIndexedMatrix into BlockMatrix which supports product operation in distributed manner.
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
def toIndexedRow(userToFeaturesMap:(Int, Iterable[Int]), maxFeatureId: Int): IndexedRow = {
userToFeaturesMap match {
case (userId, featureIDs) => {
val featureCountKV = featureIDs.map(i => (i, 1.0)).toSeq
new IndexedRow (
userId,
Vectors.sparse(maxFeatureId + 1, featureCountKV)
)
}
}
}
val userToFeatureCounters= featureData.rdd
.map(rowPF => (rowPF.getInt(0), rowPF.getInt(1))) // Out from ROW[(userId, featureId)]
.groupByKey() // (userId, Iterable(featureId))
.map(
userToFeatureIDsMap => toIndexedRow(userToFeatureIDsMap, maxFeatureId)
) // IndexedRow(userId, Vector((featureId, 1)))
val userFeatureIndexedMatrix = new IndexedRowMatrix(userToFeatureCounters)
val userFeatureBlockMatrixTransposed = userFeatureBlockMatrix
.transpose
val featuresTogetherIndexedMatrix = userFeatureBlockMatrix
.multiply(userFeatureBlockMatrixTransposed)
.toIndexedRowMatrix
Upvotes: 1
Reputation: 2934
Maybe you could transform each row into json representation, e.g:
{
"user": 14
"features" : [
{
"feature" : 0
"value" : 1
},
{
"feature" : 222
"value" : 1
}
]
}
But all depends on how you would use your "distributed matrix" later on.
Upvotes: 0