Reputation: 624
I have a column that is an array of struct in a Spark DataFrame like
|-- sTest: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- value: string (nullable = true)
| | |-- embed: array (nullable = true)
| | | |-- element: integer (containsNull = true)
that has a variable number of nested arrays ("embed") that all have equal length.
For each row, I want to take the average of these embeddings and assign the result as a column to a new dataframe (the existing + the new column).
I have read of some people using explode
but this is not quite what I want. I ultimately wanted to do an aggregate for each row computing the average embedding (array(float)
).
A minimal example of a dataframe with ArrayType(StructType) column:
val structureData = Seq(
Row(Seq(Row("value1 ", Seq(1, 2, 3)), Row("value1 ", Seq(4, 5, 6)))),
Row(Seq(Row("value2", Seq(4,5,6))), Row("value1 ", Seq(1, 1, 1)))
)
val structureSchema = new StructType()
.add("sTest", ArrayType(new StructType()
.add("value", StringType)
.add("embed", ArrayType(IntegerType))))
The desired output would be
Row(2.5, 3.5, 4.5)
Row(2.5, 3, 3.5)
Upvotes: 2
Views: 429
Reputation: 494
Your data essentially look like a matrix, and you are trying to summarize the matrix by column, so it's natural to consider using the Summarizer
from the org.apache.spark.ml.stat
package.
Input data:
case class sTest(value: String, embed: Seq[Int])
val df = Seq(
Tuple1(Seq(
sTest("value1", Seq(1, 2, 3)),
sTest("value2", Seq(4, 5, 6))
)),
Tuple1(Seq(
sTest("value3", Seq(4, 5, 6)),
sTest("value4", Seq(1, 1, 1))
))
) toDF("nested")
Calculate the average:
import org.apache.spark.sql.functions._
import org.apache.spark.ml.linalg.{Vectors, Vector}
import org.apache.spark.ml.stat.Summarizer
val array2vecUdf = udf((array: Seq[Int]) => {
Vectors.dense(array.toArray.map(_.toDouble))
})
val vec2arrayUdf = udf((vec: Vector) => {
vec.toArray
})
val stage1 = df
// Create a rowid so we can explode, extract the embed field as a vector and collect.
.withColumn("rowid", monotonically_increasing_id)
.withColumn("exp", explode($"nested"))
.withColumn("embed", $"exp".getItem("embed"))
.withColumn("embed_vec", array2vecUdf($"embed"))
val avg = Summarizer.metrics("mean").summary($"embed_vec")
val stage2 = stage1
.groupBy("rowid")
.agg(avg.alias("avg_vec"))
// Convert back from vector to array.
.select(vec2arrayUdf($"avg_vec.mean").alias("avgs"))
stage2.show(false)
Result:
+---------------+
|avgs |
+---------------+
|[2.5, 3.5, 4.5]|
|[2.5, 3.0, 3.5]|
+---------------+
Upvotes: 2