Reputation: 109
I am trying to parse/flatten JSON data using PySpark DataFrame API. The challenge is, I need to extract one data element ('Id') from the actual Key/Attribute, and also filter only the rows having 'statC' attribute. To begin with, I am trying to explode the JSON object, but getting an error. Is there a better way to extract such data?
JSON input file:
{
"changes": {
"1": [
{
"Name": "ABC-1",
"statC": {
"newValue": 10
},
"column": {
"notDone": true,
"newStatus": "10071"
}
}
],
"2": [
{
"Name": "ABC-2",
"added": true
}
],
"3": [
{
"Name": "ABC-3",
"column": {
"notDone": true,
"newStatus": "10071"
}
}
],
"4": [
{
"Name": "ABC-4",
"statC": {
"newValue": 40
}
}
],
"5": [
{
"Name": "ABC-5",
"statC": {
"newValue": 50
},
"column": {
"notDone": false,
"done": true,
"newStatus": "13685"
}
}
],
"6": [
{
"Name": "ABC-61",
"added": true
},
{
"Name": "ABC-62",
"statC": {
"oldValue": 60
}
}
],
"7": [
{
"Name": "ABC-70",
"added": true
},
{
"Name": "ABC-71",
"statC": {
"newValue": 70
}
}
{
"Name": "ABC-72",
"statC": {
"newValue": 75
}
}
]
},
"startTime": 1666188060000,
"endTime": 1667347140000,
"activatedTime": 1666188126953,
"now": 1667294686212
}
Desired output:
Id Name statC_NewValue
1 ABC-1 10
4 ABC-4 40
5 ABC-5 50
7 ABC-71 70
7 ABC-72 75
My PySpark code:
from pyspark.sql.functions import *
rawDF = spark.read.json([f"abfss://{pADLSContainer}@{pADLSGen2}.dfs.core.windows.net/{pADLSDirectory}/InputFile.json"], multiLine = "true")
idDF = rawDF.select(explode("changes").alias("changes_json"))
Error:
AnalysisException: cannot resolve 'explode(
changes
)' due to data type mismatch: input to function explode should be array or map type, not struct.
Upvotes: 1
Views: 817
Reputation: 24488
There's quite much to it. First, you can create a list of columns 1
, 2
, 3
, etc., which satisfy your conditions, select them as columns, modify arrays, unpivot and expand into columns.
from pyspark.sql import functions as F
df_raw = spark.read.json(["file.json"], multiLine =True)
df = df_raw.select('changes.*')
df0 = df.select([F.col(c)[0].alias(c) for c in df.columns])
cols = [c for c in df0.columns
if 'statC' in df0.schema[c].dataType.names and
'newValue' in df0.schema[c].dataType['statC'].dataType.names
]
df = df.select(
[F.transform(c, lambda x: F.struct(
x.Name.alias('Name'),
x.statC.newValue.alias('statC_NewValue'))).alias(c)
for c in cols]
)
to_melt = [f"'{c}', `{c}`" for c in df.columns]
df = df.selectExpr(f"stack({len(to_melt)}, {','.join(to_melt)}) (Id, val)")
df = df.selectExpr("Id", "inline(val)")
df = df.filter('statC_NewValue is not null')
df.show()
# +---+------+--------------+
# | Id| Name|statC_NewValue|
# +---+------+--------------+
# | 1| ABC-1| 10|
# | 4| ABC-4| 40|
# | 5| ABC-5| 50|
# | 7|ABC-71| 70|
# | 7|ABC-72| 75|
# +---+------+--------------+
Upvotes: 2