Reputation: 113
I want to separate a string of JSONs in my dataframe column into multiple rows in PySpark. Example:
Input:
id | addresses |
---|---|
1 | [{"city":null,"state":null,"street":"123, ABC St, ABC Square","postalCode":"11111","country":"USA"},{"city":"Dallas","state":"TX","street":"456, DEF Plaza, Test St","postalCode":"99999","country":"USA"}] |
Expected output:
id | addresses |
---|---|
1 | {"city":null,"state":null,"street":"123, ABC St, ABC Square","postalCode":"11111","country":"USA"} |
1 | {"city":"Dallas","state":"TX","street":"456, DEF Plaza, Test St","postalCode":"99999","country":"USA"} |
Any ideas how to do this?
Upvotes: 1
Views: 670
Reputation: 3639
Looking at the example in your question, it is not clear what is the type of the addresses
column and what type you need in the output column. So, let's explore different combinations.
addresses
column is of type ArrayType
: in this case, you can use explode
:df.select('id', F.explode('addresses').alias('address'))
The result is:
+---+-----------------------------------------------------------------------------------------------------+
|id |address |
+---+-----------------------------------------------------------------------------------------------------+
|1 |{country -> USA, state -> null, city -> null, street -> 123, ABC St, ABC Square, postalCode -> 11111}|
|1 |{country -> USA, state -> TX, city -> Dallas, street -> 456, DEF Plaza, Test St, postalCode -> 99999}|
+---+-----------------------------------------------------------------------------------------------------+
The type of the output column will be the same of the type of the items in the input column.
addresses
column is an Array
of StringType
, but you want your output to be a StructTpye
: in this case, you can convert each string into a struct, using from_json
:from pyspark.sql import functions as F, SparkSession, types as T
json_schema = T.StructType([
T.StructField("city", T.StringType()),
T.StructField("state", T.StringType()),
T.StructField("street", T.StringType()),
T.StructField("postalCode", T.StringType()),
T.StructField("country", T.StringType()),
])
df_struct_from_array = (
df
.withColumn('address', F.explode('addresses'))
.select('id', F.from_json('address', json_schema).alias('address'))
)
The following dataframe is the result:
+---+-------------------------------------------------+
|id |address |
+---+-------------------------------------------------+
|1 |{null, null, 123, ABC St, ABC Square, 11111, USA}|
|1 |{Dallas, TX, 456, DEF Plaza, Test St, 99999, USA}|
+---+-------------------------------------------------+
The schema of df_struct_from_array
is:
root
|-- id: long (nullable = true)
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state: string (nullable = true)
| |-- street: string (nullable = true)
| |-- postalCode: string (nullable = true)
| |-- country: string (nullable = true)
addresses
column is of StringType
and you want a StructType
Column in output: in this case, you have to convert from JSON first and then explode:json_schema = T.ArrayType(T.StructType([
T.StructField("city", T.StringType()),
T.StructField("state", T.StringType()),
T.StructField("street", T.StringType()),
T.StructField("postalCode", T.StringType()),
T.StructField("country", T.StringType()),
]))
df_struct_from_str = (
df
.withColumn('addresses_conv', F.from_json('addresses', json_schema))
.select('id', F.explode('addresses_conv').alias('address'))
)
This is what you get:
+---+-------------------------------------------------+
|id |address |
+---+-------------------------------------------------+
|1 |{null, null, 123, ABC St, ABC Square, 11111, USA}|
|1 |{Dallas, TX, 456, DEF Plaza, Test St, 99999, USA}|
+---+-------------------------------------------------+
root
|-- id: long (nullable = true)
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state: string (nullable = true)
| |-- street: string (nullable = true)
| |-- postalCode: string (nullable = true)
| |-- country: string (nullable = true)
Upvotes: 1