Reputation: 2611
I have json data which one more json has field as a string, so I want to parse that and create new columns below is the json
{
"start": "1234567679",
"data": "{\"ID\": 123 ,\"changeVlaue\" : 89, \"type\" : \"sensor\"}",
"end": "1234567689"
}
{
"start": "1234567889",
"data": "{\"name\": \"xyz \" ,\"changeState\" : \"Done \",\"mode\" : \"new \"}",
"end": "1234567989"
}
{
"start": "1234568679",
"data": "{\"field\": \"all\" ,\"ChangedBy\" : \"Admin\", \"count\" : 2}",
"end": "1234568999"
}
from this json I want to create new columns
start changeVlaue changeState ChangedBy end
1234567679 89 null null 1234567689
1234567889 null Done null 1234567989
1234568679 null null Admin 1234568679
One logic I could think of is using udfs
def getchangeVlaue(s1: String ) = {
// parse and return changeVlaue
}
def getchangeState(s1: String) = {
// parse and return changeState
}
def getChangedBy(s1: String) = {
// parse and return ChangedBy
}
val df = spark.read.json("path to json")
val tdf = df.withColumn("changeVlaue",getchangeVlaue($"data")).withColumn("changeState",getchangeState($"data")).withColumn("ChangedBy",getchangeState($"data"))
but with above solution I dont want to do because I have 100 such different fields, so I have to call withColumn 100 times,
Is there a better way like pivot for json fileds?
Upvotes: 0
Views: 253
Reputation: 18003
You can use foldLeft like for example and other variations are possible using UDF, genning which columns:
val df2 = df1
.columns
.foldLeft(df1) { (DF, colName) =>
DF
.withColumnRenamed(
colName,
colName.toLowerCase().replace(" ", "_")
)
}
Apply your own logic, this is just an example.
Upvotes: 0
Reputation: 5881
check this. I am using spark 1.6.2
val conf = new SparkConf().setMaster("local[*]").setAppName("testing")
val sc = new SparkContext(conf)
val json =
"""[
| {
| "start": "1234567679",
| "data": "{\"ID\": 123 ,\"changeVlaue\" : 89, \"type\" : \"sensor\"}",
| "end": "1234567689"
| },
| {
| "start": "1234567889",
| "data": "{\"name\": \"xyz \" ,\"changeState\" : \"Done \",\"mode\" : \"new \"}",
| "end": "1234567989"
| },
| {
| "start": "1234568679",
| "data": "{\"field\": \"all\" ,\"ChangedBy\" : \"Admin\", \"count\" : 2}",
| "end": "1234568999"
| }
|]""".stripMargin
val sqlContext = new SQLContext(sc)
val jsonrdd = sc.parallelize(Seq(json))
val inputDf = sqlContext.read.json(jsonrdd)
import sqlContext.implicits._
val df = inputDf.select("start", "data", "end")
import org.apache.spark.sql.functions.get_json_object
val dfWithData = Seq("ID", "changeVlaue", "type", "name", "changeState", "mode", "field", "ChangedBy", "count").map(
c => get_json_object($"data", s"$$.$c").alias(c))
val dfData = df.select($"*" +: dfWithData: _*).drop("data")
dfData.show()
+----------+----------+----+-----------+------+----+-----------+----+-----+---------+-----+
| start| end| ID|changeVlaue| type|name|changeState|mode|field|ChangedBy|count|
+----------+----------+----+-----------+------+----+-----------+----+-----+---------+-----+
|1234567679|1234567689| 123| 89|sensor|null| null|null| null| null| null|
|1234567889|1234567989|null| null| null|xyz | Done |new | null| null| null|
|1234568679|1234568999|null| null| null|null| null|null| all| Admin| 2|
+----------+----------+----+-----------+------+----+-----------+----+-----+---------+-----+
Upvotes: 1