mon
mon

Reputation: 22234

How to create a distributed sparse matrix in Spark from DataFrame in Scala

Question

Please help finding the ways to create a distributed matrix from the (user, feature, value) records in a DataFrame where features and their values are stored in a column.

Excerpts of the data is below but there are large number of users and features, and no all features are tested for users. Hence lots of feature values are null and to be imputed to 0.

For instance, a blood test may have sugar level, cholesterol level, etc as features. If those levels are not acceptable, then 1 is set as the value. But not all the features will be tested for the users (or patients).

+----+-------+-----+
|user|feature|value|
+----+-------+-----+
|  14|      0|    1|
|  14|    222|    1|
|  14|    200|    1|
|  22|      0|    1|
|  22|     32|    1|
|  22|    147|    1|
|  22|    279|    1|
|  22|    330|    1|
|  22|    363|    1|
|  22|    162|    1|
|  22|    811|    1|
|  22|    290|    1|
|  22|    335|    1|
|  22|    681|    1|
|  22|    786|    1|
|  22|    789|    1|
|  22|    842|    1|
|  22|    856|    1|
|  22|    881|    1|
+----+-------+-----+

If features are alredy columns, then there are ways explained.

But this is not the case. So one way could be pivoting the dataframe to apply those methods.

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|  22|  1|  1|  1|  1|  0|  0|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

Then use row to vector conversion. I suppose using one of these:

However, since there will be many null values to be imputed to 0, the pivoted dataframe will consume far more memory space. Also pivoting a large dataframe distributed among multiple nodes would be causing large shuffling.

Hence, seek for advices, ideas, suggestions.

Related

Environment

Spark 2.4.4

Upvotes: 5

Views: 3738

Answers (2)

mon
mon

Reputation: 22234

Solution

  1. Create a RDD[(user, feature)] for each input line.
  2. groupByKey to create a RDD[(user, [feature+])].
  3. Create a RDD[IndexedRow] where each IndexedRow represents below for all the features existing.
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
  1. Convert the RDD[IndexedRow] into IndexedRowMatrix.

For product operation, convert RowIndexedMatrix into BlockMatrix which supports product operation in distributed manner.

Convert each original record into IndexedRow

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

def toIndexedRow(userToFeaturesMap:(Int, Iterable[Int]), maxFeatureId: Int): IndexedRow = {
    userToFeaturesMap match {
        case (userId, featureIDs) => {
            val featureCountKV = featureIDs.map(i => (i, 1.0)).toSeq
            new IndexedRow (
                userId,
                Vectors.sparse(maxFeatureId + 1, featureCountKV)
            )
        }
    }
}

val userToFeatureCounters= featureData.rdd
    .map(rowPF => (rowPF.getInt(0), rowPF.getInt(1)))  // Out from ROW[(userId, featureId)]
    .groupByKey()                                      // (userId, Iterable(featureId))
    .map(
        userToFeatureIDsMap => toIndexedRow(userToFeatureIDsMap, maxFeatureId)
    )                                                 // IndexedRow(userId, Vector((featureId, 1)))

Created IndexedRowMatrix

val userFeatureIndexedMatrix = new IndexedRowMatrix(userToFeatureCounters)

Trasponsed IndexedRowMatrix via BlockMatrix as IndexedRowMatrix does not support transpose

val userFeatureBlockMatrixTransposed = userFeatureBlockMatrix
    .transpose

Created product with BlockMatrix as IndexedRowMatrix requires Local DenseMatrix on the right.

val featuresTogetherIndexedMatrix = userFeatureBlockMatrix
    .multiply(userFeatureBlockMatrixTransposed)
    .toIndexedRowMatrix

Upvotes: 1

Vladislav Varslavans
Vladislav Varslavans

Reputation: 2934

Maybe you could transform each row into json representation, e.g:

{ 
  "user": 14
  "features" : [
    {
      "feature" : 0
      "value"   : 1
    },
    {
      "feature" : 222
      "value"   : 1
    }
  ]
}

But all depends on how you would use your "distributed matrix" later on.

Upvotes: 0

Related Questions