wandermonk
wandermonk

Reputation: 7356

Iterate and flatten an Array of Struct types in a Dataset using Apache Spark :Java

I have a Dataset with Schema as below

 root
 |-- collectorId: string (nullable = true)
 |-- generatedAt: long (nullable = true)
 |-- managedNeId: string (nullable = true)
 |-- neAlert: struct (nullable = true)
 |    |-- advisory: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- headlineName: string (nullable = true)
 |    |-- fieldNotice: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- caveat: string (nullable = true)
 |    |    |    |-- distributionCode: string (nullable = true)
 |    |-- hwEoX: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- bulletinName: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |-- swEoX: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- bulletinHeadline: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |-- partyId: string (nullable = true)
 |-- recordType: string (nullable = true)
 |-- sourceNeId: string (nullable = true)
 |-- sourcePartyId: string (nullable = true)
 |-- sourceSubPartyId: string (nullable = true)
 |-- wfid: string (nullable = true)

I want to get the fields inside the "element". In order to do this I have done an explode on the array to flatten this.

Dataset<Row> alert = spark.read().option("multiLine", true).option("mode", "PERMISSIVE").json("C:\\Users\\LearningAndDevelopment\\\\merge\\data1\\sample.json");

Seq<String> droppedColumns = scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("neAlert"));

Dataset<Row> alertjson = alert.withColumn("exploded_advisory", explode(col("neAlert.advisory"))).withColumn("exploded_fn", explode(col("neAlert.fieldNotice"))).withColumn("exploded_swEoX", explode(col("neAlert.swEoX"))).withColumn("exploded_hwEox", explode(col("neAlert.hwEoX"))).drop(droppedColumns);

alertjson.printSchema();

I got the final JSON as below

root
 |-- collectorId: string (nullable = true)
 |-- generatedAt: long (nullable = true)
 |-- managedNeId: string (nullable = true)
 |-- partyId: string (nullable = true)
 |-- recordType: string (nullable = true)
 |-- sourceNeId: string (nullable = true)
 |-- sourcePartyId: string (nullable = true)
 |-- sourceSubPartyId: string (nullable = true)
 |-- wfid: string (nullable = true)
 |-- exploded_advisory: struct (nullable = true)
 |    |-- equipmentType: string (nullable = true)
 |    |-- headlineName: string (nullable = true)
 |-- exploded_fn: struct (nullable = true)
 |    |-- caveat: string (nullable = true)
 |    |-- distributionCode: string (nullable = true)
 |-- exploded_swEoX: struct (nullable = true)
 |    |-- bulletinHeadline: string (nullable = true)
 |    |-- equipmentType: string (nullable = true)
 |-- exploded_hwEox: struct (nullable = true)
 |    |-- bulletinName: string (nullable = true)
 |    |-- equipmentType: string (nullable = true)

But, the above method created all duplicate records flattened with data in the first element of each JSON array. Each array can have so many elements. How can i flatten the JSON arrays without loosing the data integrity.

Upvotes: 0

Views: 2452

Answers (1)

koiralo
koiralo

Reputation: 23109

You can select the nested json with . dot operator first and use explode for each nested field.

Dataset<Row> alertjson = alert
    .withColumn("exploded_advisory", explode(col("neAlert.advisory")))
    .withColumn("exploded_fn", explode(col("neAlert.fieldNotice")))
    .withColumn("exploded_swEoX", explode(col("neAlert.swEoX")))
    .withColumn("exploded_hwEox", explode(col("neAlert.hwEoX")));

If you want each field explode as individual then you have to explode separately which created multiple dataframes

// for advisory
Dataset<Row> alertjson = alert
    .withColumn("exploded_advisory", explode(col("neAlert.advisory")))

DataSet<Row> fieldNorice = alert
    .withColumn("exploded_fn", explode(col("neAlert.fieldNotice")))

Drop the unrequired columns and should work.

Upvotes: 1

Related Questions