MojoJojo
MojoJojo

Reputation: 4232

Spark: How to split struct type into multiple columns?

I know this question has been asked many times on Stack Overflow and has been satisfactorily answered in most posts, but I'm not sure if this is the best way in my case. I have a Dataset that has several struct types embedded in it:

root
 |-- STRUCT1: struct (nullable = true)
 |    |-- FIELD_1: string (nullable = true)
 |    |-- FIELD_2: long (nullable = true)
 |    |-- FIELD_3: integer (nullable = true)
 |-- STRUCT2: struct (nullable = true)
 |    |-- FIELD_4: string (nullable = true)
 |    |-- FIELD_5: long (nullable = true)
 |    |-- FIELD_6: integer (nullable = true)
 |-- STRUCT3: struct (nullable = true)
 |    |-- FIELD_7: string (nullable = true)
 |    |-- FIELD_8: long (nullable = true)
 |    |-- FIELD_9: integer (nullable = true)
 |-- ARRAYSTRUCT4: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- FIELD_10: integer (nullable = true)
 |    |    |-- FIELD_11: integer (nullable = true)

+-------+------------+------------+------------------+
|STRUCT1| STRUCT2    | STRUCT3    | ARRAYSTRUCT4     |
+-------+------------+------------+------------------+
|[1,2,3]|[aa, xx, yy]|[p1, q2, r3]|[[1a, 2b],[3c,4d]]|
+-------+------------+------------+------------------+

I want to convert this into:

1. A dataset where the structs are expanded into columns.
2. A data set where the array (ARRAYSTRUCT4) is exploded into rows.

root
 |-- FIELD_1: string (nullable = true)
 |-- FIELD_2: long (nullable = true)
 |-- FIELD_3: integer (nullable = true)
 |-- FIELD_4: string (nullable = true)
 |-- FIELD_5: long (nullable = true)
 |-- FIELD_6: integer (nullable = true)
 |-- FIELD_7: string (nullable = true)
 |-- FIELD_8: long (nullable = true)
 |-- FIELD_9: integer (nullable = true)
 |-- FIELD_10: integer (nullable = true)
 |-- FIELD_11: integer (nullable = true)

+-------+------------+------------+---------+     ---------+----------+
|FIELD_1| FIELD_2    | FIELD_3    | FIELD_4 |     |FIELD_10| FIELD_11 |
+-------+------------+------------+---------+ ... ---------+----------+
|1      |2           |3           |  aa     |     |  1a    |  2b      |
+-------+------------+------------+-----------------------------------+

To achieve this, I could use:

val expanded = df.select("STRUCT1.*", "STRUCT2.*", "STRUCT3.*", "STRUCT4")

followed by an explode:

val exploded = expanded.select(explode(expanded("STRUCT4")))

However, I was wondering if there's a more functional way to do this, especially the select. I could use withColumn as below:

data.withColumn("FIELD_1", $"STRUCT1".getItem(0))
      .withColumn("FIELD_2", $"STRUCT1".getItem(1))
      .....

But I have 80+ columns. Is there a better way to achieve this?

Upvotes: 1

Views: 4701

Answers (1)

Leo C
Leo C

Reputation: 22439

You can first make all columns struct-type by explode-ing any Array(struct) columns into struct columns via foldLeft, then use map to interpolate each of the struct column names into col.*, as shown below:

import org.apache.spark.sql.functions._

case class S1(FIELD_1: String, FIELD_2: Long, FIELD_3: Int)
case class S2(FIELD_4: String, FIELD_5: Long, FIELD_6: Int)
case class S3(FIELD_7: String, FIELD_8: Long, FIELD_9: Int)
case class S4(FIELD_10: Int, FIELD_11: Int)

val df = Seq(
  (S1("a1", 101, 11), S2("a2", 102, 12), S3("a3", 103, 13), Array(S4(1, 1), S4(3, 3))),
  (S1("b1", 201, 21), S2("b2", 202, 22), S3("b3", 203, 23), Array(S4(2, 2), S4(4, 4)))
).toDF("STRUCT1", "STRUCT2", "STRUCT3", "ARRAYSTRUCT4")

// +-----------+-----------+-----------+--------------+
// |    STRUCT1|    STRUCT2|    STRUCT3|  ARRAYSTRUCT4|
// +-----------+-----------+-----------+--------------+
// |[a1,101,11]|[a2,102,12]|[a3,103,13]|[[1,1], [3,3]]|
// |[b1,201,21]|[b2,202,22]|[b3,203,23]|[[2,2], [4,4]]|
// +-----------+-----------+-----------+--------------+

val arrayCols = df.dtypes.filter( t => t._2.startsWith("ArrayType(StructType") ).
  map(_._1)
// arrayCols: Array[String] = Array(ARRAYSTRUCT4)

val expandedDF = arrayCols.foldLeft(df)((accDF, c) =>
  accDF.withColumn(c.replace("ARRAY", ""), explode(col(c))).drop(c)
)

val structCols = expandedDF.columns

expandedDF.select(structCols.map(c => col(s"$c.*")): _*).
  show
// +-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+
// |FIELD_1|FIELD_2|FIELD_3|FIELD_4|FIELD_5|FIELD_6|FIELD_7|FIELD_8|FIELD_9|FIELD_10|FIELD_11|
// +-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+
// |     a1|    101|     11|     a2|    102|     12|     a3|    103|     13|       1|       1|
// |     a1|    101|     11|     a2|    102|     12|     a3|    103|     13|       3|       3|
// |     b1|    201|     21|     b2|    202|     22|     b3|    203|     23|       2|       2|
// |     b1|    201|     21|     b2|    202|     22|     b3|    203|     23|       4|       4|
// +-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+

Note that for simplicity it's assumed that your DataFrame has only struct and Array(struct)-type columns. If there are other data types, just apply filtering conditions to arrayCols and structCols accordingly.

Upvotes: 3

Related Questions