aj2713
aj2713

Reputation: 21

Pyspark to flatten an array and explode a struct to get the desired output

I have a data with below Schema: index attribute is Struct --> with array --> each array element inside struct

root
 |-- id_num: string (nullable = true)
 |-- indexes: struct (nullable = true)
 |    |-- customer_rating: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |-- reputation: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |-- visibility: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)

I want to translate the schema into below format and get the data values into corresponding columns

root
 |-- id_num: string (nullable = true)
 |-- indexes_type: string (nullable = true)    --> this field hold indexes struct elements as a row
 |-- data_sufficiency_indicator: boolean (nullable = true)
 |-- value: double (nullable = true)
 |-- version: string (nullable = true)
 |-- low_value_reason: string (nullable = true)  --> each element in the array becomes a new row

Here is the sample input data in json format:

{"id_num":"1234","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.16,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"5678","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.71,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"9876","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":3.06}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}

Expected Output

id_num  |   indexes_type    |   version |   data_sufficiency_indicator | value  |   low_value_reason
==============================================================================================================
9999        visibility          2.0             true                    2.16        low scores from reviews_and_visits
9999        visibility          2.0             true                    2.16        low scores from online_presence
9999        customer_rating     2.0             false
9999        reputation          2.0             false
8888        visibility          2.0             true                    2.71        low scores from reviews_and_visits  
8888        visibility          2.0             true                    2.71        low scores from online_presence
8888        customer_rating     2.0             false
7898        visibility          2.0             true                    3.06
7898        customer_rating     2.0             false       
7898        reputation          2.0             false

Any help on this usecase is much appreciated. Also is it possible to get the output without hardcoding the struct values in the code, since they can extend beyond what is in the example.

Upvotes: 1

Views: 708

Answers (1)

jxc
jxc

Reputation: 13998

You can set the column indexes as MapType instead of StructType by explicitly specifying the schema when loading the dataframe with spark.read.json(), see below:

schema = "id_num string,indexes map<string,array<struct<data_sufficiency_indicator:boolean,low_value_reason:array<string>,value:double,version:string>>>"

df = spark.read.json("/path/to/jsons", schema=schema)

df.printSchema()
root
 |-- id_num: string (nullable = true)
 |-- indexes: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)

and then do select and explode_outer/inline_outer multiple times to get the desired result:

df_new = df.selectExpr("id_num", "explode_outer(indexes) as (indexes_type, vals)") \
    .selectExpr("*", "inline_outer(vals)") \
    .selectExpr(
        "id_num",
        "indexes_type",
        "version",
        "data_sufficiency_indicator",
        "value",
        "explode_outer(low_value_reason) as low_value_reason"
    )

df_new.show(truncate=False)
+------+---------------+-------+--------------------------+-----+----------------------------------+
|id_num|indexes_type   |version|data_sufficiency_indicator|value|low_value_reason                  |
+------+---------------+-------+--------------------------+-----+----------------------------------+
|1234  |visibility     |2.0    |true                      |2.16 |low scores from reviews_and_visits|
|1234  |visibility     |2.0    |true                      |2.16 |low scores from online_presence   |
|1234  |customer_rating|2.0    |false                     |null |null                              |
|1234  |reputation     |2.0    |false                     |null |null                              |
|5678  |visibility     |2.0    |true                      |2.71 |low scores from reviews_and_visits|
|5678  |visibility     |2.0    |true                      |2.71 |low scores from online_presence   |
|5678  |customer_rating|2.0    |false                     |null |null                              |
|9876  |visibility     |2.0    |true                      |3.06 |null                              |
|9876  |customer_rating|2.0    |false                     |null |null                              |
|9876  |reputation     |2.0    |false                     |null |null                              |
+------+---------------+-------+--------------------------+-----+----------------------------------+

BTW. I changed data_id to id_num in your sample JSON which I supposed is your typo. if not, just add data_id string into schema and then use coalesce(id_num,data_id) to get the final id_num column.

On the other hand, you can also try using from_json/to_json function after loading the dataframe without specifying schema, see a similar example here.

Upvotes: 1

Related Questions