Yogen Rai
Yogen Rai

Reputation: 3033

Convert nested json string in Dataset to Dataset/Dataframe in Spark Scala

I have a simple program that has Dataset with column resource_serialized having JSON string as value as below:

import org.apache.spark.SparkConf

object TestApp {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("Loading Data").setMaster("local[*]")

    val spark = org.apache.spark.sql.SparkSession
      .builder
      .config(sparkConf)
      .appName("Test")
      .getOrCreate()

    val json = "[{\"resource_serialized\":\"{\\\"createdOn\\\":\\\"2000-07-20 00:00:00.0\\\",\\\"genderCode\\\":\\\"0\\\"}\",\"id\":\"00529e54-0f3d-4c76-9d3\"}]"

    import spark.implicits._
    val df = spark.read.json(Seq(json).toDS)
    df.printSchema()
    df.show()
  }
}

Schema printed is:

root
 |-- id: string (nullable = true)
 |-- resource_serialized: string (nullable = true)

Dataset printed on the console is:

+--------------------+--------------------+
|                  id| resource_serialized|
+--------------------+--------------------+
|00529e54-0f3d-4c7...|{"createdOn":"200...|
+--------------------+--------------------+

the resource_serialized field has json string, which is (from debug console)

enter image description here

Now, I need to create dataset/dataframe out of that json string, how can i achieve this?

My goal is to get Dataset like this:

+--------------------+--------------------+----------+
|                  id|           createdOn|genderCode|
+--------------------+--------------------+----------+
|00529e54-0f3d-4c7...|2000-07-20 00:00    |         0|
+--------------------+--------------------+----------+

Upvotes: 1

Views: 1271

Answers (2)

QuickSilver
QuickSilver

Reputation: 4045

Below solution will allow you to map all the key values of resource_serialized to (String,String) table which later on can be parse mapped.

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}

object TestApp {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("Loading Data").setMaster("local[*]")

    val spark = org.apache.spark.sql.SparkSession
      .builder
      .config(sparkConf)
      .appName("Test")
      .getOrCreate()

    val json = "[{\"resource_serialized\":\"{\\\"createdOn\\\":\\\"2000-07-20 00:00:00.0\\\",\\\"genderCode\\\":\\\"0\\\"}\",\"id\":\"00529e54-0f3d-4c76-9d3\"}]"

    import spark.implicits._
    val df = spark.read.json(Seq(json).toDS)
    val jsonColumn = from_json($"resource_serialized", MapType(StringType, StringType))
    val keysDF = df.select(explode(map_keys(jsonColumn))).distinct()
    val keys = keysDF.collect().map(f=>f.get(0))
    val keyCols = keys.map(f=> jsonColumn.getItem(f).as(f.toString))
    df.select( $"id" +: keyCols:_*).show(false)

  }
}


the output would look like

+----------------------+---------------------+----------+
|id                    |createdOn            |genderCode|
+----------------------+---------------------+----------+
|00529e54-0f3d-4c76-9d3|2000-07-20 00:00:00.0|0         |
+----------------------+---------------------+----------+

Upvotes: 1

notNull
notNull

Reputation: 31540

Use from_json function to convert json string to df columns.

Example:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val sch= new StructType().add("createdOn",StringType).add("genderCode",StringType)
df.select(col("id"),from_json(col("resource_serialized"),sch).alias("str")).
select("id","str.*").
show(10,false)

//result
//+----------------------+---------------------+----------+
//|id                    |createdOn            |genderCode|
//+----------------------+---------------------+----------+
//|00529e54-0f3d-4c76-9d3|2000-07-20 00:00:00.0|0         |
//+----------------------+---------------------+----------+

If you have a valid json, we can read json with schema directly in spark.read.json

val json = """[{"resource_serialized":{"createdOn":"2000-07-20 00:00:00.0","genderCode":"0"},"id":"00529e54-0f3d-4c76-9d3"}]"""

val sch=new StructType().
add("id",StringType).
add("resource_serialized", new StructType().add("createdOn",StringType).
add("genderCode",StringType))

spark.read.option("multiline","true").
schema(sch).
json(Seq(json).toDS).
select("id","resource_serialized.*").
show()
//+--------------------+--------------------+----------+
//|                  id|           createdOn|genderCode|
//+--------------------+--------------------+----------+
//|00529e54-0f3d-4c7...|2000-07-20 00:00:...|         0|
//+--------------------+--------------------+----------+

Upvotes: 2

Related Questions