user9074332
user9074332

Reputation: 2636

Spark Scala - Split Array of Structs into Dataframe Columns

I have a nested source json file that contains an array of structs. The number of structs varies greatly from row to row and I would like to use Spark (scala) to dynamically create new dataframe columns from the key/values of the struct where the key is the column name and the value is the column value.

Example Minified json record

{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}

dataframe schema

scala> val df = spark.read.json("file:///tmp/nested_test.json")
root
 |-- key1: struct (nullable = true)
 |    |-- key2: struct (nullable = true)
 |    |    |-- key3: string (nullable = true)
 |    |    |-- key4: string (nullable = true)
 |    |    |-- key5: struct (nullable = true)
 |    |    |    |-- key6: string (nullable = true)
 |    |    |    |-- key7: string (nullable = true)
 |    |    |    |-- values: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)

Whats been done so far

df.select(
    ($"key1.key2.key3").as("key3"),
    ($"key1.key2.key4").as("key4"),
    ($"key1.key2.key5.key6").as("key6"),
    ($"key1.key2.key5.key7").as("key7"),
    ($"key1.key2.key5.values").as("values")).
    show(truncate=false)

+----+----+----+----+----------------------------------------------------------------------------+
|key3|key4|key6|key7|values                                                                      |
+----+----+----+----+----------------------------------------------------------------------------+
|AK  |EU  |001 |N   |[[valuesColumn1, 9.876], [valuesColumn2, 1.2345], [valuesColumn3, 8.675309]]|
+----+----+----+----+----------------------------------------------------------------------------+

There is an array of 3 structs here but the 3 structs need to be spilt into 3 separate columns dynamically (the number of 3 can vary greatly), and I am not sure how to do it.

Sample Desired output

Notice that there were 3 new columns produced for each of the array elements within the values array.

+----+----+----+----+-----------------------------------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-----------------------------------------+
|AK  |EU  |001 |N   |9.876        |1.2345        |8.675309    |
+----+----+----+----+-----------------------------------------+

Reference

I believe that the desired solution is something similar to what was discussed in this SO post but with 2 main differences:

  1. The number of columns is hardcoded to 3 in the SO post but in my circumstance, the number of array elements is unknown
  2. The column names need to be driven by the name column and the column value by the value.
...
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)

Upvotes: 0

Views: 1010

Answers (2)

user9074332
user9074332

Reputation: 2636

I found that this approach performed much better and was easier to understand using an explode and pivot:

val json = """{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}"""

val df = spark.read.json(Seq(json).toDS())

// schema
df.printSchema
root
 |-- key1: struct (nullable = true)
 |    |-- key2: struct (nullable = true)
 |    |    |-- key3: string (nullable = true)
 |    |    |-- key4: string (nullable = true)
 |    |    |-- key5: struct (nullable = true)
 |    |    |    |-- key6: string (nullable = true)
 |    |    |    |-- key7: string (nullable = true)
 |    |    |    |-- values: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)

// create final df
val finalDf = df.
    select(
      $"key1.key2.key3".as("key3"),
      $"key1.key2.key4".as("key4"),
      $"key1.key2.key5.key6".as("key6"),
      $"key1.key2.key5.key7".as("key7"),
      explode($"key1.key2.key5.values").as("values")
    ).
    groupBy(
      $"key3", $"key4", $"key6", $"key7"
    ).
    pivot("values.name").
    agg(min("values.value")).alias("values.name")

// result
finalDf.show
+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|  AK|  EU| 001|   N|        9.876|       1.2345|     8.675309|
+----+----+----+----+-------------+-------------+-------------+

Upvotes: 0

Rakhi Agrawal
Rakhi Agrawal

Reputation: 917

You could do it this way:

val sac = new SparkContext("local[*]", " first Program");
val sqlc = new SQLContext(sac);
import sqlc.implicits._;
import org.apache.spark.sql.functions.split
import scala.math._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.{ min, max }

val json = """{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}"""

val df1 = sqlc.read.json(Seq(json).toDS())

val df2 = df1.select(
    ($"key1.key2.key3").as("key3"),
    ($"key1.key2.key4").as("key4"),
    ($"key1.key2.key5.key6").as("key6"),
    ($"key1.key2.key5.key7").as("key7"),
    ($"key1.key2.key5.values").as("values")
)

val numColsVal = df2
    .withColumn("values_size", size($"values"))
    .agg(max($"values_size"))
    .head()
    .getInt(0)

val finalDFColumns = df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null))).columns
val finalDF = df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)

The resulting final output as :

+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|AK  |EU  |001 |N   |9.876        |1.2345       |8.675309     |
+----+----+----+----+-------------+-------------+-------------+

Hope I got your question right!

----------- EDIT with Explanation----------

This block gets the number of columns to be created for the array structure.

val numColsVal = df2
        .withColumn("values_size", size($"values"))
        .agg(max($"values_size"))
        .head()
        .getInt(0)

finalDFColumns is the DF created with all the expected columns as output with null values.

Below block returns the different columns that needs to be created from the array structure.

df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect

Below block combines the above new columns with the other columns in df2 initialized with empty/null values.

foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null)))

Combining these two blocks if you print the output you will get :

+----+----+----+----+------+-------------+-------------+-------------+
|key3|key4|key6|key7|values|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+------+-------------+-------------+-------------+
+----+----+----+----+------+-------------+-------------+-------------+

Now we have the structure ready. We need the values for corresponding columns here. Below block gets us the values:

df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)

This results like below:

+----+----+----+----+--------------------+---------------+---------------+---------------+
|key3|key4|key6|key7|              values|values[0][name]|values[1][name]|values[2][name]|
+----+----+----+----+--------------------+---------------+---------------+---------------+
|  AK|  EU| 001|   N|[[valuesColumn1, ...|          9.876|         1.2345|       8.675309|
+----+----+----+----+--------------------+---------------+---------------+---------------+

Now we need to rename the columns as we have in the first block above. So we will use the zip function to merge the columns and then use foldLeft method to rename the output columns as below :

finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)

This results in the below structure:

+----+----+----+----+--------------------+-------------+-------------+-------------+
|key3|key4|key6|key7|              values|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+--------------------+-------------+-------------+-------------+
|  AK|  EU| 001|   N|[[valuesColumn1, ...|        9.876|       1.2345|     8.675309|
+----+----+----+----+--------------------+-------------+-------------+-------------+

We are almost there. We now just need to remove the unwanted values column like this:

finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)

Thus resulting into expected output as follows -

+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|AK  |EU  |001 |N   |9.876        |1.2345       |8.675309     |
+----+----+----+----+-------------+-------------+-------------+

I'm not sure if I was able to explain it clearly. But if you try breaking the above statements/code and try printing it you will get to know how we are reaching till the output. You could find the explanation with examples for different functions used in this logic on internet.

Upvotes: 1

Related Questions