Snehasish Das
Snehasish Das

Reputation: 291

Read Pyspark Struct Json Column Non Required elements

I have a parquet file , one of the column of it is a struct field which stores the json. The struct is shown as below.

originator: struct (nullable = true)
    |-- originatorDetail: struct (nullable = true)
    |    |-- applicationDeployedId: string (nullable = true)
    |    |-- applicationDeployedNameVersion: string (nullable = true)
    |    |-- applicationNameVersion: string (nullable = true)
    |    |-- cloudHost: string (nullable = true)
    |    |-- cloudRegion: string (nullable = true)
    |    |-- cloudStack: string (nullable = true)
    |    |-- version: string (nullable = true)
    |-- Orversion: string (nullable = true)

Only the version field is required in json and others are non required fields. So some of the records might have only 2 element and still be valid.

Suppose i want to read the CloudHost field. I can read it as originator.originatorDetail.cloudHost. But for records where this non required field is not present. It would fail as the element is not there. Is there any way I can read these non required value as null for records where the values are not present without using a udf.

Some examples

originator": {
    "originatorDetail": {
      "applicationDeployedId": "PSLV",
      "cloudRegion": "Mangal",
      "cloudHost": "Petrol",
      "applicationNameVersion": "CRDI",
      "applicationDeployedNameVersion": "Tuna",
      "cloudStack": "DEV",
      "version": "1.1.0"
    },
    Orversion": "version.1"
  }
  -------------
 originator": {
    "originatorDetail": {
      "version": "1.1.0"
    },
    Orversion": "version.1"
  }

Required Output

applicationDeployedId applicationDeployedNameVersion  applicationNameVersion cloudHost cloudRegion cloudStack version  Orversion
 PSLV                   Tuna                            CRDI                   Petrol    Mangal       DEV       1.1.0    version.1
                                                                                                                1.1.0    version.1

Upvotes: 3

Views: 615

Answers (1)

notNull
notNull

Reputation: 31540

Use from_json function from Spark-2.4+

Read the parquet data then use from_json by passing the schema that matches with your json column.

Spark will read the matching data and adds non matching fields with null values.

Example:

df.show(10,False)
#+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|id |json_data                                                                                                                                                                                                                                                      #|
#+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|1  |{"originator": {"originatorDetail": {"applicationDeployedId": "PSLV","cloudRegion": "Mangal","cloudHost": "Petrol","applicationNameVersion": "CRDI","applicationDeployedNameVersion": "Tuna","cloudStack": "DEV","version": "1.1.0"},"Orversion": "version.1"}}|
#|2  |{"originator": {    "originatorDetail": {      "version": "1.1.0"    },    "Orversion": "version.1"}}                                                                                                                                                          |
#+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
schema=StructType([StructField("originator",StructType([StructField("Orversion",StringType(),True),
            StructField("originatorDetail",StructType([StructField("applicationDeployedId",StringType(),True),
            StructField("applicationDeployedNameVersion",StringType(),True),
            StructField("applicationNameVersion",StringType(),True),
            StructField("cloudHost",StringType(),True),
            StructField("cloudRegion",StringType(),True),
            StructField("cloudStack",StringType(),True),
            StructField("version",StringType(),True)]),True)]),True)])

from pyspark.sql.functions import *
from pyspark.sql.types import *

#then read the json_data column using from_json function
df.withColumn("json_converted",from_json(col("json_data"),schema)).select("id","json_converted").show(10,False)
#+---+--------------------------------------------------------+
#|id |json_converted                                          |
#+---+--------------------------------------------------------+
#|1  |[[version.1, [PSLV, Tuna,, Petrol, Mangal, DEV, 1.1.0]]]|
#|2  |[[version.1, [,,,,,, 1.1.0]]]                           |
#+---+--------------------------------------------------------+

df.withColumn("json_converted",from_json(col("json_data"),schema)).select("id","json_converted").printSchema()
#root
# |-- id: long (nullable = true)
# |-- json_converted: struct (nullable = true)
# |    |-- originator: struct (nullable = true)
# |    |    |-- Orversion: string (nullable = true)
# |    |    |-- originatorDetail: struct (nullable = true)
# |    |    |    |-- applicationDeployedId: string (nullable = true)
# |    |    |    |-- applicationDeployedNameVersion: string (nullable = true)
# |    |    |    |-- applicationNameVersi: string (nullable = true)
# |    |    |    |-- cloudHost: string (nullable = true)
# |    |    |    |-- cloudRegion: string (nullable = true)
# |    |    |    |-- cloudStack: string (nullable = true)
# |    |    |    |-- version: string (nullable = true)

#even though we don't have all fields from id=2 still we added fields
df.withColumn("json_converted",from_json(col("json_data"),schema)).select("json_converted.originator.originatorDetail.applicationDeployedId").show(10,False)
#+---------------------+
#|applicationDeployedId|
#+---------------------+
#|PSLV                 |
#|null                 |
#+---------------------+

Upvotes: 2

Related Questions