Reputation: 487
I have 10000 jsons with different ids each has 10000 names. How to flatten nested arrays by merging values by int or str in pyspark?
EDIT: I have added column name_10000_xvz
to explain better data structure. I have updated Notes, Input df, required output df and input json files as well.
Notes:
id
, date
, val
has always the same naming convention across all columns and all jsonsdate
, val
are always there so they can be hardcodeddate
can be different in each array, for example name_1_a starts with 2001, but name_10000_xvz for id == 1 starts with 2000 and finnish with 2004, however for id == 2 starts with 1990 and finish with 2004Input df:
root
|-- id: long (nullable = true)
|-- name_10000_xvz: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- name_1_a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- name_1_b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- name_2_a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|id |name_10000_xvz |name_1_a |name_1_b |name_2_a |
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|2 |[{1990, 39}, {2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}, {2004, 34}]|[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
|1 |[{2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}] |[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
Required output df:
+---+---------+----------+-----------+---------+----------------+
|id | date | name_1_a | name_1_b |name_2_a | name_10000_xvz |
+---+---------+----------+-----------+---------+----------------+
|1 | 2000 | 0 | 0 | 0 | 30 |
|1 | 2001 | 1 | 4 | 21 | 31 |
|1 | 2002 | 2 | 5 | 22 | 32 |
|1 | 2003 | 3 | 6 | 23 | 33 |
|2 | 1990 | 0 | 0 | 0 | 39 |
|2 | 2000 | 0 | 0 | 0 | 30 |
|2 | 2001 | 1 | 4 | 21 | 31 |
|2 | 2002 | 2 | 5 | 22 | 32 |
|2 | 2003 | 3 | 6 | 23 | 33 |
|2 | 2004 | 0 | 0 | 0 | 34 |
+---+---------+----------+-----------+---------+----------------+
To reproduce input df:
df = spark.read.json(sc.parallelize([
"""{"id":1,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
"""{"id":2,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]))
Useful links:
Upvotes: 2
Views: 2881
Reputation: 4059
As @werner has mentioned, it's necessary to transform all structs to append the column name into it.
import pyspark.sql.functions as f
names = [column for column in df.columns if column.startswith('name_')]
expressions = []
for name in names:
expressions.append(f.expr('TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val))'.format(name=name)))
flatten_df = (df
.withColumn('flatten', f.flatten(f.array(*expressions)))
.selectExpr('id', 'inline(flatten)'))
output_df = (flatten_df
.groupBy('id', 'date')
.pivot('name', names)
.agg(f.first('val')))
output_df.sort('id', 'date').show(truncate=False)
+---+----+--------------+--------+--------+--------+
|id |date|name_10000_xvz|name_1_a|name_1_b|name_2_a|
+---+----+--------------+--------+--------+--------+
|1 |2000|30 |null |null |null |
|1 |2001|31 |1 |4 |21 |
|1 |2002|32 |2 |5 |22 |
|1 |2003|33 |3 |6 |23 |
|2 |1990|39 |null |null |null |
|2 |2000|30 |null |null |null |
|2 |2001|31 |1 |4 |21 |
|2 |2002|32 |2 |5 |22 |
|2 |2003|33 |3 |6 |23 |
|2 |2004|34 |null |null |null |
+---+----+--------------+--------+--------+--------+
Assuming:
date
value is always the same value all columnsname_1_a, name_1_b, name_2_a
their sizes are equalsimport pyspark.sql.functions as f
output_df = (df
.withColumn('flatten', f.expr('TRANSFORM(SEQUENCE(0, size(name_1_a) - 1), i -> ' \
'STRUCT(name_1_a[i].date AS date, ' \
' name_1_a[i].val AS name_1_a, ' \
' name_1_b[i].val AS name_1_b, ' \
' name_2_a[i].val AS name_2_a))'))
.selectExpr('id', 'inline(flatten)'))
output_df.sort('id', 'date').show(truncate=False)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1 |2001|1 |4 |21 |
|1 |2002|2 |5 |22 |
|1 |2003|3 |6 |23 |
|2 |2001|1 |4 |21 |
|2 |2002|2 |5 |22 |
|2 |2003|3 |6 |23 |
+---+----+--------+--------+--------+
Upvotes: 3
Reputation: 8711
How are the naming conventions used?.
Can you try something below using spark-sql?
df.createOrReplaceTempView("df")
spark.sql("""
select id,
name_1_a.date[0] as date, name_1_a.val[0] as name_1_a, name_1_b.val[0] as name_1_b, name_2_a.val[0] as name_2_a
from df
""").show(false)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1 |2001|1 |4 |21 |
|2 |2001|1 |4 |21 |
+---+----+--------+--------+--------+
Here are my assumptions.
Building up the dataframe.
JSON strings
val jsonstr1 = """{ "id": 1, "name_1_a": [ { "date": 2001, "val": 1 }, { "date": 2002, "val": 2 }, { "date": 2003, "val": 3 } ], "name_1_b": [ { "date": 2001, "val": 4 }, { "date": 2002, "val": 5 }, { "date": 2003, "val": 6 } ], "name_2_a": [ { "date": 2001, "val": 21 }, { "date": 2002, "val": 22 }, { "date": 2003, "val": 23 } ]}"""
val jsonstr2 = """{ "id": 2, "name_1_a": [ { "date": 2001, "val": 1 }, { "date": 2002, "val": 2 }, { "date": 2003, "val": 3 } ], "name_1_b": [ { "date": 2001, "val": 4 }, { "date": 2002, "val": 5 }, { "date": 2003, "val": 6 } ], "name_2_a": [ { "date": 2001, "val": 21 }, { "date": 2002, "val": 22 }, { "date": 2003, "val": 23 } ]}"""
Dataframes
val df1 = spark.read.json(Seq(jsonstr1).toDS)
val df2 = spark.read.json(Seq(jsonstr2).toDS)
val df = df1.union(df2)
Now create a view on top of df. Im just naming it as "df"
df.createOrReplaceTempView("df")
Show the data:
df.show(false)
df.printSchema
Use the metadata and construct the sql string.
df.columns
Array[String] = Array(id, name_1_a, name_1_b, name_2_a)
val names = df.columns.drop(1) // drop id
val sql1 = for { i <- 0 to 2
t1=names.map( x => x + s".val[${i}] as ${x}").mkString(",")
t2 = names(0) + ".date[0] as date ," + t1
_=println(t)
} yield s""" select id, ${t2} from df """
val sql2 = sql1.mkString(" union All ")
Now sql2 contains the below string which is a valid sql
" select id, name_1_a.date[0] as date ,name_1_a.val[0] as name_1_a,name_1_b.val[0] as name_1_b,name_2_a.val[0] as name_2_a from df union All select id, name_1_a.date[0] as date ,name_1_a.val[1] as name_1_a,name_1_b.val[1] as name_1_b,name_2_a.val[1] as name_2_a from df union All select id, name_1_a.date[0] as date ,name_1_a.val[2] as name_1_a,name_1_b.val[2] as name_1_b,name_2_a.val[2] as name_2_a from df "
Pass it to spark.sql(sql2) and get the required result
spark.sql(sql2).orderBy("id").show(false)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1 |2001|2 |5 |22 |
|1 |2001|1 |4 |21 |
|1 |2001|3 |6 |23 |
|2 |2001|1 |4 |21 |
|2 |2001|3 |6 |23 |
|2 |2001|2 |5 |22 |
+---+----+--------+--------+--------+
Upvotes: 1