Dan
Dan

Reputation: 487

Pyspark: How to flatten nested arrays by merging values in spark

I have 10000 jsons with different ids each has 10000 names. How to flatten nested arrays by merging values by int or str in pyspark?

EDIT: I have added column name_10000_xvz to explain better data structure. I have updated Notes, Input df, required output df and input json files as well.

Notes:

Input df:

root
 |-- id: long (nullable = true)
 |-- name_10000_xvz: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- name_1_a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- name_1_b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- name_2_a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)

+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|id |name_10000_xvz                                                          |name_1_a                         |name_1_b                         |name_2_a                            |
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|2  |[{1990, 39}, {2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}, {2004, 34}]|[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
|1  |[{2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}]                        |[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+

Required output df:

+---+---------+----------+-----------+---------+----------------+
|id |   date  | name_1_a | name_1_b  |name_2_a | name_10000_xvz |
+---+---------+----------+-----------+---------+----------------+
|1  |   2000  |     0    |    0      |   0     |        30      |
|1  |   2001  |     1    |    4      |   21    |        31      |
|1  |   2002  |     2    |    5      |   22    |        32      |
|1  |   2003  |     3    |    6      |   23    |        33      |
|2  |   1990  |     0    |    0      |   0     |        39      |
|2  |   2000  |     0    |    0      |   0     |        30      |
|2  |   2001  |     1    |    4      |   21    |        31      |
|2  |   2002  |     2    |    5      |   22    |        32      |
|2  |   2003  |     3    |    6      |   23    |        33      |
|2  |   2004  |     0    |    0      |   0     |        34      |
+---+---------+----------+-----------+---------+----------------+

To reproduce input df:

df = spark.read.json(sc.parallelize([
  """{"id":1,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
  """{"id":2,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]))

Useful links:

Upvotes: 2

Views: 2881

Answers (2)

Kafels
Kafels

Reputation: 4059

UPDATE

As @werner has mentioned, it's necessary to transform all structs to append the column name into it.

import pyspark.sql.functions as f

names = [column for column in df.columns if column.startswith('name_')]

expressions = []
for name in names:
  expressions.append(f.expr('TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val))'.format(name=name)))

flatten_df = (df
              .withColumn('flatten', f.flatten(f.array(*expressions)))
              .selectExpr('id', 'inline(flatten)'))

output_df = (flatten_df
             .groupBy('id', 'date')
             .pivot('name', names)
             .agg(f.first('val')))

output_df.sort('id', 'date').show(truncate=False)
+---+----+--------------+--------+--------+--------+
|id |date|name_10000_xvz|name_1_a|name_1_b|name_2_a|
+---+----+--------------+--------+--------+--------+
|1  |2000|30            |null    |null    |null    |
|1  |2001|31            |1       |4       |21      |
|1  |2002|32            |2       |5       |22      |
|1  |2003|33            |3       |6       |23      |
|2  |1990|39            |null    |null    |null    |
|2  |2000|30            |null    |null    |null    |
|2  |2001|31            |1       |4       |21      |
|2  |2002|32            |2       |5       |22      |
|2  |2003|33            |3       |6       |23      |
|2  |2004|34            |null    |null    |null    |
+---+----+--------------+--------+--------+--------+

OLD

Assuming:

  • date value is always the same value all columns
  • name_1_a, name_1_b, name_2_a their sizes are equals
import pyspark.sql.functions as f

output_df = (df
             .withColumn('flatten', f.expr('TRANSFORM(SEQUENCE(0, size(name_1_a) - 1), i -> ' \
                                           'STRUCT(name_1_a[i].date AS date, ' \
                                           '       name_1_a[i].val AS name_1_a, ' \
                                           '       name_1_b[i].val AS name_1_b, ' \
                                           '       name_2_a[i].val AS name_2_a))'))
             .selectExpr('id', 'inline(flatten)'))

output_df.sort('id', 'date').show(truncate=False)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1  |2001|1       |4       |21      |
|1  |2002|2       |5       |22      |
|1  |2003|3       |6       |23      |
|2  |2001|1       |4       |21      |
|2  |2002|2       |5       |22      |
|2  |2003|3       |6       |23      |
+---+----+--------+--------+--------+

Upvotes: 3

stack0114106
stack0114106

Reputation: 8711

How are the naming conventions used?.

Can you try something below using spark-sql?

df.createOrReplaceTempView("df")
spark.sql("""
select id, 
name_1_a.date[0] as date, name_1_a.val[0] as name_1_a, name_1_b.val[0] as name_1_b, name_2_a.val[0] as name_2_a
from df
""").show(false)

+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1  |2001|1       |4       |21      |
|2  |2001|1       |4       |21      |
+---+----+--------+--------+--------+

Here are my assumptions.

  1. The first field is id and the rest are all names..1 to n like name_1_a, name_1_b, name_2_a, etc
  2. The date is same across all "n" names, so I can use the first field for deriving it.

Building up the dataframe.

JSON strings

val jsonstr1 = """{  "id": 1,  "name_1_a": [    {      "date": 2001,      "val": 1    },    {      "date": 2002,      "val": 2    },    {      "date": 2003,      "val": 3    }  ],  "name_1_b": [    {      "date": 2001,      "val": 4    },    {      "date": 2002,      "val": 5    },    {      "date": 2003,      "val": 6    }  ],  "name_2_a": [    {      "date": 2001,      "val": 21    },    {      "date": 2002,      "val": 22    },    {      "date": 2003,      "val": 23    }  ]}"""

val jsonstr2 = """{  "id": 2,  "name_1_a": [    {      "date": 2001,      "val": 1    },    {      "date": 2002,      "val": 2    },    {      "date": 2003,      "val": 3    }  ],  "name_1_b": [    {      "date": 2001,      "val": 4    },    {      "date": 2002,      "val": 5    },    {      "date": 2003,      "val": 6    }  ],  "name_2_a": [    {      "date": 2001,      "val": 21    },    {      "date": 2002,      "val": 22    },    {      "date": 2003,      "val": 23    }  ]}"""

Dataframes

val df1 = spark.read.json(Seq(jsonstr1).toDS)
val df2 = spark.read.json(Seq(jsonstr2).toDS)
val df = df1.union(df2)

Now create a view on top of df. Im just naming it as "df"

df.createOrReplaceTempView("df")

Show the data:

df.show(false)
df.printSchema

Use the metadata and construct the sql string.

df.columns

Array[String] = Array(id, name_1_a, name_1_b, name_2_a)

val names = df.columns.drop(1)  // drop id
val sql1 = for { i <- 0 to 2
                  t1=names.map( x => x + s".val[${i}] as ${x}").mkString(",")
                  t2 = names(0) + ".date[0] as date ," + t1
              _=println(t)
    } yield s""" select id, ${t2} from df """
val sql2 = sql1.mkString(" union All ")

Now sql2 contains the below string which is a valid sql

" select id, name_1_a.date[0] as date ,name_1_a.val[0] as name_1_a,name_1_b.val[0] as name_1_b,name_2_a.val[0] as name_2_a from df  union All  select id, name_1_a.date[0] as date ,name_1_a.val[1] as name_1_a,name_1_b.val[1] as name_1_b,name_2_a.val[1] as name_2_a from df  union All  select id, name_1_a.date[0] as date ,name_1_a.val[2] as name_1_a,name_1_b.val[2] as name_1_b,name_2_a.val[2] as name_2_a from df "

Pass it to spark.sql(sql2) and get the required result

spark.sql(sql2).orderBy("id").show(false)

+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1  |2001|2       |5       |22      |
|1  |2001|1       |4       |21      |
|1  |2001|3       |6       |23      |
|2  |2001|1       |4       |21      |
|2  |2001|3       |6       |23      |
|2  |2001|2       |5       |22      |
+---+----+--------+--------+--------+

Upvotes: 1

Related Questions