Reputation: 291
I have a parquet file , one of the column of it is a struct field which stores the json. The struct is shown as below.
originator: struct (nullable = true)
|-- originatorDetail: struct (nullable = true)
| |-- applicationDeployedId: string (nullable = true)
| |-- applicationDeployedNameVersion: string (nullable = true)
| |-- applicationNameVersion: string (nullable = true)
| |-- cloudHost: string (nullable = true)
| |-- cloudRegion: string (nullable = true)
| |-- cloudStack: string (nullable = true)
| |-- version: string (nullable = true)
|-- Orversion: string (nullable = true)
Only the version field is required in json and others are non required fields. So some of the records might have only 2 element and still be valid.
Suppose i want to read the CloudHost field. I can read it as originator.originatorDetail.cloudHost. But for records where this non required field is not present. It would fail as the element is not there. Is there any way I can read these non required value as null for records where the values are not present without using a udf.
Some examples
originator": {
"originatorDetail": {
"applicationDeployedId": "PSLV",
"cloudRegion": "Mangal",
"cloudHost": "Petrol",
"applicationNameVersion": "CRDI",
"applicationDeployedNameVersion": "Tuna",
"cloudStack": "DEV",
"version": "1.1.0"
},
Orversion": "version.1"
}
-------------
originator": {
"originatorDetail": {
"version": "1.1.0"
},
Orversion": "version.1"
}
Required Output
applicationDeployedId applicationDeployedNameVersion applicationNameVersion cloudHost cloudRegion cloudStack version Orversion
PSLV Tuna CRDI Petrol Mangal DEV 1.1.0 version.1
1.1.0 version.1
Upvotes: 3
Views: 615
Reputation: 31540
Use from_json
function from Spark-2.4+
Read the parquet data then use from_json
by passing the schema that matches with your json column.
Spark will read the matching data and adds non matching fields with null values.
Example:
df.show(10,False)
#+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|id |json_data #|
#+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|1 |{"originator": {"originatorDetail": {"applicationDeployedId": "PSLV","cloudRegion": "Mangal","cloudHost": "Petrol","applicationNameVersion": "CRDI","applicationDeployedNameVersion": "Tuna","cloudStack": "DEV","version": "1.1.0"},"Orversion": "version.1"}}|
#|2 |{"originator": { "originatorDetail": { "version": "1.1.0" }, "Orversion": "version.1"}} |
#+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
schema=StructType([StructField("originator",StructType([StructField("Orversion",StringType(),True),
StructField("originatorDetail",StructType([StructField("applicationDeployedId",StringType(),True),
StructField("applicationDeployedNameVersion",StringType(),True),
StructField("applicationNameVersion",StringType(),True),
StructField("cloudHost",StringType(),True),
StructField("cloudRegion",StringType(),True),
StructField("cloudStack",StringType(),True),
StructField("version",StringType(),True)]),True)]),True)])
from pyspark.sql.functions import *
from pyspark.sql.types import *
#then read the json_data column using from_json function
df.withColumn("json_converted",from_json(col("json_data"),schema)).select("id","json_converted").show(10,False)
#+---+--------------------------------------------------------+
#|id |json_converted |
#+---+--------------------------------------------------------+
#|1 |[[version.1, [PSLV, Tuna,, Petrol, Mangal, DEV, 1.1.0]]]|
#|2 |[[version.1, [,,,,,, 1.1.0]]] |
#+---+--------------------------------------------------------+
df.withColumn("json_converted",from_json(col("json_data"),schema)).select("id","json_converted").printSchema()
#root
# |-- id: long (nullable = true)
# |-- json_converted: struct (nullable = true)
# | |-- originator: struct (nullable = true)
# | | |-- Orversion: string (nullable = true)
# | | |-- originatorDetail: struct (nullable = true)
# | | | |-- applicationDeployedId: string (nullable = true)
# | | | |-- applicationDeployedNameVersion: string (nullable = true)
# | | | |-- applicationNameVersi: string (nullable = true)
# | | | |-- cloudHost: string (nullable = true)
# | | | |-- cloudRegion: string (nullable = true)
# | | | |-- cloudStack: string (nullable = true)
# | | | |-- version: string (nullable = true)
#even though we don't have all fields from id=2 still we added fields
df.withColumn("json_converted",from_json(col("json_data"),schema)).select("json_converted.originator.originatorDetail.applicationDeployedId").show(10,False)
#+---------------------+
#|applicationDeployedId|
#+---------------------+
#|PSLV |
#|null |
#+---------------------+
Upvotes: 2