Reputation: 7356
I have a Dataset with Schema as below
root
|-- collectorId: string (nullable = true)
|-- generatedAt: long (nullable = true)
|-- managedNeId: string (nullable = true)
|-- neAlert: struct (nullable = true)
| |-- advisory: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- headlineName: string (nullable = true)
| |-- fieldNotice: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- caveat: string (nullable = true)
| | | |-- distributionCode: string (nullable = true)
| |-- hwEoX: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- bulletinName: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| |-- swEoX: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- bulletinHeadline: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
|-- partyId: string (nullable = true)
|-- recordType: string (nullable = true)
|-- sourceNeId: string (nullable = true)
|-- sourcePartyId: string (nullable = true)
|-- sourceSubPartyId: string (nullable = true)
|-- wfid: string (nullable = true)
I want to get the fields inside the "element". In order to do this I have done an explode on the array to flatten this.
Dataset<Row> alert = spark.read().option("multiLine", true).option("mode", "PERMISSIVE").json("C:\\Users\\LearningAndDevelopment\\\\merge\\data1\\sample.json");
Seq<String> droppedColumns = scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("neAlert"));
Dataset<Row> alertjson = alert.withColumn("exploded_advisory", explode(col("neAlert.advisory"))).withColumn("exploded_fn", explode(col("neAlert.fieldNotice"))).withColumn("exploded_swEoX", explode(col("neAlert.swEoX"))).withColumn("exploded_hwEox", explode(col("neAlert.hwEoX"))).drop(droppedColumns);
alertjson.printSchema();
I got the final JSON as below
root
|-- collectorId: string (nullable = true)
|-- generatedAt: long (nullable = true)
|-- managedNeId: string (nullable = true)
|-- partyId: string (nullable = true)
|-- recordType: string (nullable = true)
|-- sourceNeId: string (nullable = true)
|-- sourcePartyId: string (nullable = true)
|-- sourceSubPartyId: string (nullable = true)
|-- wfid: string (nullable = true)
|-- exploded_advisory: struct (nullable = true)
| |-- equipmentType: string (nullable = true)
| |-- headlineName: string (nullable = true)
|-- exploded_fn: struct (nullable = true)
| |-- caveat: string (nullable = true)
| |-- distributionCode: string (nullable = true)
|-- exploded_swEoX: struct (nullable = true)
| |-- bulletinHeadline: string (nullable = true)
| |-- equipmentType: string (nullable = true)
|-- exploded_hwEox: struct (nullable = true)
| |-- bulletinName: string (nullable = true)
| |-- equipmentType: string (nullable = true)
But, the above method created all duplicate records flattened with data in the first element of each JSON array. Each array can have so many elements. How can i flatten the JSON arrays without loosing the data integrity.
Upvotes: 0
Views: 2452
Reputation: 23109
You can select the nested json
with .
dot operator first and use explode
for each nested field.
Dataset<Row> alertjson = alert
.withColumn("exploded_advisory", explode(col("neAlert.advisory")))
.withColumn("exploded_fn", explode(col("neAlert.fieldNotice")))
.withColumn("exploded_swEoX", explode(col("neAlert.swEoX")))
.withColumn("exploded_hwEox", explode(col("neAlert.hwEoX")));
If you want each field explode
as individual then you have to explode separately which created multiple dataframes
// for advisory
Dataset<Row> alertjson = alert
.withColumn("exploded_advisory", explode(col("neAlert.advisory")))
DataSet<Row> fieldNorice = alert
.withColumn("exploded_fn", explode(col("neAlert.fieldNotice")))
Drop the unrequired columns and should work.
Upvotes: 1