mzbaran
mzbaran

Reputation: 624

Taking average of nested vectors of doubles in Struct

I have a column that is an array of struct in a Spark DataFrame like

|-- sTest: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- value: string (nullable = true)
 |    |    |-- embed: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)

that has a variable number of nested arrays ("embed") that all have equal length.

For each row, I want to take the average of these embeddings and assign the result as a column to a new dataframe (the existing + the new column).

I have read of some people using explode but this is not quite what I want. I ultimately wanted to do an aggregate for each row computing the average embedding (array(float)).

A minimal example of a dataframe with ArrayType(StructType) column:

val structureData = Seq(
  Row(Seq(Row("value1 ", Seq(1, 2, 3)), Row("value1 ", Seq(4, 5, 6)))),
  Row(Seq(Row("value2", Seq(4,5,6))), Row("value1 ", Seq(1, 1, 1)))
)

val structureSchema = new StructType()
  .add("sTest", ArrayType(new StructType()
    .add("value", StringType)
    .add("embed", ArrayType(IntegerType))))

The desired output would be

Row(2.5, 3.5, 4.5)
Row(2.5, 3, 3.5)

Upvotes: 2

Views: 429

Answers (1)

memoryz
memoryz

Reputation: 494

Your data essentially look like a matrix, and you are trying to summarize the matrix by column, so it's natural to consider using the Summarizer from the org.apache.spark.ml.stat package.

Input data:

case class sTest(value: String, embed: Seq[Int])

val df = Seq(
  Tuple1(Seq(
    sTest("value1", Seq(1, 2, 3)), 
    sTest("value2", Seq(4, 5, 6))
  )),
  Tuple1(Seq(
    sTest("value3", Seq(4, 5, 6)), 
    sTest("value4", Seq(1, 1, 1))
  ))
) toDF("nested")

Calculate the average:

import org.apache.spark.sql.functions._
import org.apache.spark.ml.linalg.{Vectors, Vector}
import org.apache.spark.ml.stat.Summarizer

val array2vecUdf = udf((array: Seq[Int]) => {
  Vectors.dense(array.toArray.map(_.toDouble))
})

val vec2arrayUdf = udf((vec: Vector) => {
  vec.toArray
})

val stage1 = df
  // Create a rowid so we can explode, extract the embed field as a vector and collect.
  .withColumn("rowid", monotonically_increasing_id)
  .withColumn("exp", explode($"nested"))
  .withColumn("embed", $"exp".getItem("embed"))
  .withColumn("embed_vec", array2vecUdf($"embed"))

val avg = Summarizer.metrics("mean").summary($"embed_vec")

val stage2 = stage1
  .groupBy("rowid")
  .agg(avg.alias("avg_vec"))
  // Convert back from vector to array.
  .select(vec2arrayUdf($"avg_vec.mean").alias("avgs"))

stage2.show(false)

Result:

+---------------+
|avgs           |
+---------------+
|[2.5, 3.5, 4.5]|
|[2.5, 3.0, 3.5]|
+---------------+

Upvotes: 2

Related Questions