Reputation: 21
I have a data with below Schema: index attribute is Struct --> with array --> each array element inside struct
root
|-- id_num: string (nullable = true)
|-- indexes: struct (nullable = true)
| |-- customer_rating: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- data_sufficiency_indicator: boolean (nullable = true)
| | | |-- value: double (nullable = true)
| | | |-- version: string (nullable = true)
| |-- reputation: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- data_sufficiency_indicator: boolean (nullable = true)
| | | |-- low_value_reason: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- value: double (nullable = true)
| | | |-- version: string (nullable = true)
| |-- visibility: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- data_sufficiency_indicator: boolean (nullable = true)
| | | |-- low_value_reason: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- value: double (nullable = true)
| | | |-- version: string (nullable = true)
I want to translate the schema into below format and get the data values into corresponding columns
root
|-- id_num: string (nullable = true)
|-- indexes_type: string (nullable = true) --> this field hold indexes struct elements as a row
|-- data_sufficiency_indicator: boolean (nullable = true)
|-- value: double (nullable = true)
|-- version: string (nullable = true)
|-- low_value_reason: string (nullable = true) --> each element in the array becomes a new row
Here is the sample input data in json format:
{"id_num":"1234","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.16,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"5678","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.71,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"9876","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":3.06}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}
Expected Output
id_num | indexes_type | version | data_sufficiency_indicator | value | low_value_reason
==============================================================================================================
9999 visibility 2.0 true 2.16 low scores from reviews_and_visits
9999 visibility 2.0 true 2.16 low scores from online_presence
9999 customer_rating 2.0 false
9999 reputation 2.0 false
8888 visibility 2.0 true 2.71 low scores from reviews_and_visits
8888 visibility 2.0 true 2.71 low scores from online_presence
8888 customer_rating 2.0 false
7898 visibility 2.0 true 3.06
7898 customer_rating 2.0 false
7898 reputation 2.0 false
Any help on this usecase is much appreciated. Also is it possible to get the output without hardcoding the struct values in the code, since they can extend beyond what is in the example.
Upvotes: 1
Views: 708
Reputation: 13998
You can set the column indexes
as MapType instead of StructType by explicitly specifying the schema when loading the dataframe with spark.read.json()
, see below:
schema = "id_num string,indexes map<string,array<struct<data_sufficiency_indicator:boolean,low_value_reason:array<string>,value:double,version:string>>>"
df = spark.read.json("/path/to/jsons", schema=schema)
df.printSchema()
root
|-- id_num: string (nullable = true)
|-- indexes: map (nullable = true)
| |-- key: string
| |-- value: array (valueContainsNull = true)
| | |-- element: struct (containsNull = true)
| | | |-- data_sufficiency_indicator: boolean (nullable = true)
| | | |-- low_value_reason: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- value: double (nullable = true)
| | | |-- version: string (nullable = true)
and then do select and explode_outer/inline_outer multiple times to get the desired result:
df_new = df.selectExpr("id_num", "explode_outer(indexes) as (indexes_type, vals)") \
.selectExpr("*", "inline_outer(vals)") \
.selectExpr(
"id_num",
"indexes_type",
"version",
"data_sufficiency_indicator",
"value",
"explode_outer(low_value_reason) as low_value_reason"
)
df_new.show(truncate=False)
+------+---------------+-------+--------------------------+-----+----------------------------------+
|id_num|indexes_type |version|data_sufficiency_indicator|value|low_value_reason |
+------+---------------+-------+--------------------------+-----+----------------------------------+
|1234 |visibility |2.0 |true |2.16 |low scores from reviews_and_visits|
|1234 |visibility |2.0 |true |2.16 |low scores from online_presence |
|1234 |customer_rating|2.0 |false |null |null |
|1234 |reputation |2.0 |false |null |null |
|5678 |visibility |2.0 |true |2.71 |low scores from reviews_and_visits|
|5678 |visibility |2.0 |true |2.71 |low scores from online_presence |
|5678 |customer_rating|2.0 |false |null |null |
|9876 |visibility |2.0 |true |3.06 |null |
|9876 |customer_rating|2.0 |false |null |null |
|9876 |reputation |2.0 |false |null |null |
+------+---------------+-------+--------------------------+-----+----------------------------------+
BTW. I changed data_id
to id_num
in your sample JSON which I supposed is your typo. if not, just add data_id string
into schema and then use coalesce(id_num,data_id)
to get the final id_num
column.
On the other hand, you can also try using from_json
/to_json
function after loading the dataframe without specifying schema, see a similar example here.
Upvotes: 1