Reputation: 555
I have an JSON input like this
{ "1": { "id": 1, "value": 5 }, "2": { "id": 2, "list": { "10": { "id": 10 }, "11": { "id": 11 }, "20": { "id": 20 } } }, "3": { "id": 3, "key": "a" } }
I need to merge the 3 columns and extract the needed values for each column, and this is the output I need:
{ "out": { "1": 5, "2": [10, 11, 20], "3": "a" } }
I tried to create a UDF to transform these 3 columns into 1, but I could not figure how to define MapType() with mixed value types - IntegerType()
, ArrayType(IntegerType())
and StringType()
respectively.
Thanks in advance!
Upvotes: 2
Views: 9913
Reputation: 10086
MapType()
is used for (key, value) pairs definitions not for nested data frames. What you're looking for is StructType()
You can load it directly using createDataFrame
but you'd have to pass a schema, so this way is easier:
import json
data_json = {
"1": {
"id": 1,
"value": 5
},
"2": {
"id": 2,
"list": {
"10": {
"id": 10
},
"11": {
"id": 11
},
"20": {
"id": 20
}
}
},
"3": {
"id": 3,
"key": "a"
}
}
a=[json.dumps(data_json)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)
df.printSchema()
root
|-- 1: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- value: long (nullable = true)
|-- 2: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- list: struct (nullable = true)
| | |-- 10: struct (nullable = true)
| | | |-- id: long (nullable = true)
| | |-- 11: struct (nullable = true)
| | | |-- id: long (nullable = true)
| | |-- 20: struct (nullable = true)
| | | |-- id: long (nullable = true)
|-- 3: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- key: string (nullable = true)
Now to access nested dataframes. Note that column "2" is more nested than the other ones:
nested_cols = ["2"]
cols = ["1", "3"]
import pyspark.sql.functions as psf
df = df.select(
cols + [psf.array(psf.col(c + ".list.*")).alias(c) for c in nested_cols]
)
df = df.select(
[df[c].id.alias(c) for c in df.columns]
)
root
|-- 1: long (nullable = true)
|-- 3: long (nullable = true)
|-- 2: array (nullable = false)
| |-- element: long (containsNull = true)
It's not exactly your final output since you want it nested in an "out"
column:
import pyspark.sql.functions as psf
df.select(psf.struct("*").alias("out")).printSchema()
root
|-- out: struct (nullable = false)
| |-- 1: long (nullable = true)
| |-- 3: long (nullable = true)
| |-- 2: array (nullable = false)
| | |-- element: long (containsNull = true)
Finally back to JSON:
df.toJSON().first()
'{"1":1,"3":3,"2":[10,11,20]}'
Upvotes: 2
Reputation: 13926
You need to define resulting type of the UDF using StructType
, not the MapType
, like this:
from pyspark.sql.types import *
udf_result = StructType([
StructField('1', IntegerType()),
StructField('2', ArrayType(StringType())),
StructField('3', StringType())
])
Upvotes: 4