Kris
Kris

Reputation: 109

How to extract data from a JSON key/value pair, if the key also has the actual value

I am trying to parse/flatten JSON data using PySpark DataFrame API. The challenge is, I need to extract one data element ('Id') from the actual Key/Attribute, and also filter only the rows having 'statC' attribute. To begin with, I am trying to explode the JSON object, but getting an error. Is there a better way to extract such data?

JSON input file:

{
  "changes": {
    "1": [
      {
        "Name": "ABC-1",
        "statC": {
          "newValue": 10
        },
        "column": {
          "notDone": true,
          "newStatus": "10071"
        }
      }
    ],
    "2": [
      {
        "Name": "ABC-2",
        "added": true
      }
    ],
    "3": [
      {
        "Name": "ABC-3",
        "column": {
          "notDone": true,
          "newStatus": "10071"
        }
      }
    ],
    "4": [
      {
        "Name": "ABC-4",
        "statC": {
          "newValue": 40
        }
      }
    ],
    "5": [
      {
        "Name": "ABC-5",
        "statC": {
          "newValue": 50
        },
        "column": {
          "notDone": false,
          "done": true,
          "newStatus": "13685"
        }
      }
    ],
    "6": [
      {
        "Name": "ABC-61",
        "added": true
      },
      {
        "Name": "ABC-62",
        "statC": {
          "oldValue": 60
        }
      }
    ],
    "7": [
      {
        "Name": "ABC-70",
        "added": true
      },
      {
        "Name": "ABC-71",
        "statC": {
          "newValue": 70
        }
      }    
      {
        "Name": "ABC-72",
        "statC": {
          "newValue": 75
        }
      }
    ]
  },
  "startTime": 1666188060000,
  "endTime": 1667347140000,
  "activatedTime": 1666188126953,
  "now": 1667294686212
}

Desired output:

Id  Name   statC_NewValue 
1   ABC-1  10      
4   ABC-4  40     
5   ABC-5  50 
7   ABC-71 70
7   ABC-72 75

My PySpark code:

from pyspark.sql.functions import * 
rawDF = spark.read.json([f"abfss://{pADLSContainer}@{pADLSGen2}.dfs.core.windows.net/{pADLSDirectory}/InputFile.json"], multiLine = "true")

idDF = rawDF.select(explode("changes").alias("changes_json"))

Error:

AnalysisException: cannot resolve 'explode(changes)' due to data type mismatch: input to function explode should be array or map type, not struct.

Upvotes: 1

Views: 817

Answers (1)

ZygD
ZygD

Reputation: 24488

There's quite much to it. First, you can create a list of columns 1, 2, 3, etc., which satisfy your conditions, select them as columns, modify arrays, unpivot and expand into columns.

from pyspark.sql import functions as F

df_raw = spark.read.json(["file.json"], multiLine =True)
df = df_raw.select('changes.*')

df0 = df.select([F.col(c)[0].alias(c) for c in df.columns])
cols = [c for c in df0.columns
          if 'statC' in df0.schema[c].dataType.names and
             'newValue' in df0.schema[c].dataType['statC'].dataType.names
]
df = df.select(
    [F.transform(c, lambda x: F.struct(
        x.Name.alias('Name'),
        x.statC.newValue.alias('statC_NewValue'))).alias(c)
     for c in cols]
)
to_melt = [f"'{c}', `{c}`" for c in df.columns]
df = df.selectExpr(f"stack({len(to_melt)}, {','.join(to_melt)}) (Id, val)")
df = df.selectExpr("Id", "inline(val)")
df = df.filter('statC_NewValue is not null')

df.show()
# +---+------+--------------+
# | Id|  Name|statC_NewValue|
# +---+------+--------------+
# |  1| ABC-1|            10|
# |  4| ABC-4|            40|
# |  5| ABC-5|            50|
# |  7|ABC-71|            70|
# |  7|ABC-72|            75|
# +---+------+--------------+

Upvotes: 2

Related Questions