Reputation: 79
I have a data frame like the one below in Spark, and I want to group it by the id
column and then for each line in the grouped data I need to create a sparse vector with elements from the weight
column at indices specified by the index
column. The length of the sparse vector is known, say 1000 for this example.
Dataframe df
:
+-----+------+-----+
| id|weight|index|
+-----+------+-----+
|11830| 1| 8|
|11113| 1| 3|
| 1081| 1| 3|
| 2654| 1| 3|
|10633| 1| 3|
|11830| 1| 28|
|11351| 1| 12|
| 2737| 1| 26|
|11113| 3| 2|
| 6590| 1| 2|
+-----+------+-----+
I have read this which is sort of similar of what I want to do, but for a rdd. Does anyone know of a good way to do this for a data frame in Spark using Scala?
My attempt so far is to first collect the weights and indices as lists like this:
val dfWithLists = df
.groupBy("id")
.agg(collect_list("weight") as "weights", collect_list("index") as "indices"))
which looks like:
+-----+---------+----------+
| id| weights| indices|
+-----+---------+----------+
|11830| [1, 1]| [8, 28]|
|11113| [1, 3]| [3, 2]|
| 1081| [1]| [3]|
| 2654| [1]| [3]|
|10633| [1]| [3]|
|11351| [1]| [12]|
| 2737| [1]| [26]|
| 6590| [1]| [2]|
+-----+---------+----------+
Then I define a udf and do something like this:
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.udf
def toSparseVector: ((Array[Int], Array[BigInt]) => Vector) = {(a1, a2) => Vectors.sparse(1000, a1, a2.map(x => x.toDouble))}
val udfToSparseVector = udf(toSparseVector)
val dfWithSparseVector = dfWithLists.withColumn("SparseVector", udfToSparseVector($"indices", $"weights"))
but this doesn't seem to work, and it feels like there should be an easier way to do it without needing to collecting the weights and indices to lists first.
I'm pretty new to Spark, Dataframes and Scala, so any help is highly appreciated.
Upvotes: 3
Views: 4144
Reputation: 1464
You have to collect them as vectors must be local, single machine: https://spark.apache.org/docs/latest/mllib-data-types.html#local-vector
For creating the sparse vectors you have 2 options, using unordered (index, value) pairs or specifying the indices and values arrays: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.Vectors$
If you can get the data into a different format (pivoted), you could also make use of the VectorAssembler: https://spark.apache.org/docs/latest/ml-features.html#vectorassembler
With some small tweaks you can get your approach working:
:paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val df = Seq((11830,1,8), (11113, 1, 3), (1081, 1,3), (2654, 1, 3), (10633, 1, 3), (11830, 1, 28), (11351, 1, 12), (2737, 1, 26), (11113, 3, 2), (6590, 1, 2)).toDF("id", "weight", "index")
val dfWithFeat = df
.rdd
.map(r => (r.getInt(0), (r.getInt(2), r.getInt(1).toDouble)))
.groupByKey()
.map(r => LabeledPoint(r._1, Vectors.sparse(1000, r._2.toSeq)))
.toDS
dfWithFeat.printSchema
dfWithFeat.show(10, false)
// Exiting paste mode, now interpreting.
root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)
+-------+-----------------------+
|label |features |
+-------+-----------------------+
|11113.0|(1000,[2,3],[3.0,1.0]) |
|2737.0 |(1000,[26],[1.0]) |
|10633.0|(1000,[3],[1.0]) |
|1081.0 |(1000,[3],[1.0]) |
|6590.0 |(1000,[2],[1.0]) |
|11830.0|(1000,[8,28],[1.0,1.0])|
|2654.0 |(1000,[3],[1.0]) |
|11351.0|(1000,[12],[1.0]) |
+-------+-----------------------+
dfWithFeat: org.apache.spark.sql.Dataset[org.apache.spark.mllib.regression.LabeledPoint] = [label: double, features: vector]
Upvotes: 6