Reputation: 842
I am trying to create a dataframe from a nested jsonString and split to multiple data frames ie the outer element data will go to one dataframe and the nested child data will go to another dataframe.There could be multiple nested elements. I looked t the other posts, none of them provide a working sample for below scenario.Below is an example where the number of states is dynamic and i want to store the country info and state info in 2 separate hdfs folders. So the parent dataframe is holding a row like below.
val jsonStr="""{"country":"US","ISD":"001","states":[{"state1":"NJ","state2":"NY","state3":"PA"}]}"""
val countryDf = spark.read.json(Seq(jsonStr).toDS)
countryDf.show(false)
+---+-------+--------------+
|ISD|country|states |
+---+-------+--------------+
|001|US |[[NJ, NY, PA]]|
+---+-------+--------------+
countryDf.withColumn("states",explode($"states")).show(false)
val statesDf = countryDf.select(explode(countryDf("states").as("states")))
statesDf.show(false)
+------------+
|col |
+------------+
|[NJ, NY, PA]|
+------------+
Expected out put
2 Dataframes
countryDf
+---+-------+
|ISD|country|
+---+-------+
|001|US |
+---+-------+
statesDf
+------+-------+-------+-------+
country| state1| state2| state3
+------+---------------+-------+
US | NJ NY PA
+------+-------+-------+-------+
I looked at the other questions in stack overflow regarding nested json flattening . no one has a working solution for the same.
Upvotes: 1
Views: 309
Reputation: 216
When you read a nested JSON and convert it to a dataset, the nested part gets stored as a struct type. So, you have to think of flattening a struct type in a dataframe.
val jsonStr="""{"country":"US","ISD":"001","states":[{"state1":"NJ","state2":"NY","state3":"PA"}]}"""
val countryDf = spark.read.json(Seq(jsonStr).toDS)
countryDf.show(false)
+---+-------+--------------+
|ISD|country|states |
+---+-------+--------------+
|001|US |[[NJ, NY, PA]]|
+---+-------+--------------+
val countryDfExploded = countryDf.withColumn("states",explode($"states"))
countryDfExploded.show(false)
+---+-------+------------+
|ISD|country|states |
+---+-------+------------+
|001|US |[NJ, NY, PA]|
+---+-------+------------+
val countrySelectDf = countryDfExploded.select($"ISD", $"country")
countrySelectDf.show(false)
+---+-------+
|ISD|country|
+---+-------+
|001|US |
+---+-------+
val statesDf = countryDfExploded.select( $"country",$"states.*")
statesDf.show(false)
+-------+------+------+------+
|country|state1|state2|state3|
+-------+------+------+------+
|US |NJ |NY |PA |
+-------+------+------+------+
Upvotes: 1
Reputation: 437
Here is a bit of code that does the job. You should consider performance and if the number of columns is very large. I have collected all the map fields and added them to dataframe.
val jsonStr="""{"country":"US","ISD":"001","states":[{"state1":"NJ","state2":"NY","state3":"PA"}]}"""
import spark.implicits._
val countryDf = spark.read.json(Seq(jsonStr).toDS)
countryDf.show(false)
val statesDf = countryDf.select($"country", explode($"states").as("states"))
val index = statesDf.schema.fieldIndex("states")
val stateSchema = statesDf.schema(index).dataType.asInstanceOf[StructType]
var columns = mutable.LinkedHashSet[Column]()
stateSchema.fields.foreach(field =>{
columns.add(lit(field.name))
columns.add(col( "state." + field.name))
})
val s2 = statesDf
.withColumn("statesMap", map(columns.toSeq: _*))
val allMapKeys = s2.select(explode($"statesMap")).select($"key").distinct.collect().map(_.get(0).toString)
val s3 = allMapKeys.foldLeft(s2)((a, b) => a.withColumn(b, a("statesMap")(b)))
.drop("statesMap")
s3.show(false)
Upvotes: 1