scoder
scoder

Reputation: 2611

spark parse internal json value field and create a new columns

I have json data which one more json has field as a string, so I want to parse that and create new columns below is the json

{
    "start": "1234567679",
    "data": "{\"ID\": 123 ,\"changeVlaue\" : 89, \"type\" : \"sensor\"}",
    "end": "1234567689"
}

{
    "start": "1234567889",
    "data": "{\"name\": \"xyz \" ,\"changeState\" : \"Done \",\"mode\" : \"new \"}",
    "end": "1234567989"
}

{
    "start": "1234568679",
    "data": "{\"field\": \"all\" ,\"ChangedBy\" : \"Admin\", \"count\" : 2}",
    "end": "1234568999"
}

from this json I want to create new columns

 start             changeVlaue     changeState    ChangedBy    end
 1234567679            89            null          null       1234567689
 1234567889           null           Done          null       1234567989
 1234568679           null           null          Admin      1234568679

One logic I could think of is using udfs

def getchangeVlaue(s1: String ) = {
    // parse and return changeVlaue
 } 

 def getchangeState(s1: String) = {
    // parse and return changeState
 } 

 def getChangedBy(s1: String) = {
     // parse and return ChangedBy
 }   

 val df = spark.read.json("path to json")

 val tdf = df.withColumn("changeVlaue",getchangeVlaue($"data")).withColumn("changeState",getchangeState($"data")).withColumn("ChangedBy",getchangeState($"data"))

but with above solution I dont want to do because I have 100 such different fields, so I have to call withColumn 100 times,

Is there a better way like pivot for json fileds?

Upvotes: 0

Views: 253

Answers (2)

Ged
Ged

Reputation: 18003

You can use foldLeft like for example and other variations are possible using UDF, genning which columns:

val df2 = df1
   .columns
   .foldLeft(df1) { (DF, colName) =>
    DF
      .withColumnRenamed(
       colName,
       colName.toLowerCase().replace(" ", "_")
      )
 }

Apply your own logic, this is just an example.

Upvotes: 0

Kishore
Kishore

Reputation: 5881

check this. I am using spark 1.6.2

val conf = new SparkConf().setMaster("local[*]").setAppName("testing")
val sc = new SparkContext(conf)

val json =
  """[
    |  {
    |    "start": "1234567679",
    |    "data": "{\"ID\": 123 ,\"changeVlaue\" : 89, \"type\" : \"sensor\"}",
    |    "end": "1234567689"
    |  },
    |  {
    |    "start": "1234567889",
    |    "data": "{\"name\": \"xyz \" ,\"changeState\" : \"Done \",\"mode\" : \"new \"}",
    |    "end": "1234567989"
    |  },
    |  {
    |    "start": "1234568679",
    |    "data": "{\"field\": \"all\" ,\"ChangedBy\" : \"Admin\", \"count\" : 2}",
    |    "end": "1234568999"
    |  }
    |]""".stripMargin

val sqlContext = new SQLContext(sc)
val jsonrdd = sc.parallelize(Seq(json))

val inputDf = sqlContext.read.json(jsonrdd)

import sqlContext.implicits._
val df = inputDf.select("start", "data", "end")

import org.apache.spark.sql.functions.get_json_object

val dfWithData = Seq("ID", "changeVlaue", "type", "name", "changeState", "mode", "field", "ChangedBy", "count").map(
  c => get_json_object($"data", s"$$.$c").alias(c))

val dfData = df.select($"*" +: dfWithData: _*).drop("data")
dfData.show()

+----------+----------+----+-----------+------+----+-----------+----+-----+---------+-----+
|     start|       end|  ID|changeVlaue|  type|name|changeState|mode|field|ChangedBy|count|
+----------+----------+----+-----------+------+----+-----------+----+-----+---------+-----+
|1234567679|1234567689| 123|         89|sensor|null|       null|null| null|     null| null|
|1234567889|1234567989|null|       null|  null|xyz |      Done |new | null|     null| null|
|1234568679|1234568999|null|       null|  null|null|       null|null|  all|    Admin|    2|
+----------+----------+----+-----------+------+----+-----------+----+-----+---------+-----+

Upvotes: 1

Related Questions