Reputation: 1648
I am reading a dynamodb table from Spark, this table has one JSON string in one field and strings in other fields. I am able to read the JSON fields but not the nested JSON fields. This is not a DUPLICATE of query Json Column using dataframes. The question does explain how to extract columns from JSON string but not the Nested JSON columns.
import com.github.traviscrawford.spark.dynamodb._
val users = sqlContext.read.dynamodb("Dynamodb_table")
users.show(1)
Sample Data set
|col1 | ID | field2|field3|
-------------------------------------------------------------------------------------
|{"a":[{"b":"value1","x":23},{"b":value2,"x":52}],"c":"valC"}|A1 | X1 |Y1 |
I need to extract few fields from col1(JSON structure) and ID field. I am able to figure out how to parse the JSON field(col1) and get field 'c' from col1 as explained here but not able to extract the nested fields.
My code:
val users = sqlContext.read.dynamodb("Dynamodb_table")
val data = users.selectExpr("get_json_object(col1, '$.c')","get_json_object(col1, '$.a')","ID")
data.show(1,false)
|a |c |ID|
---------------------------------------------------------
|[{"b":"value1","x":23},{"b":value2","x":52}...]|valC|A1|
Now when i try to apply the same get_json_object on above data frame, i get all null values.
val nestedData = data.selectExpr("get_json_object(a, '$.b')","c","ID")
nestedData.show(false)
|get_json_object(a, '$.b')| c | ID|
------------------------------------
|null |valC|A1 |
I tried explode as well since col 'a' has array and struct. But that didn't work either as the data frame 'data' is returning col/field 'a' as a string instead of an array.Any ideas how to solve this?
Update: I also tried parsing using JSON4s and net.liftweb.json.parse . That didn't help either
case class aInfo(b: String)
case class col1(a: Option[aInfo]), c: String)
import net.liftweb.json.parse
val parseJson = udf((data: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(data).extract[Data]
})
val parsed = users.withColumn("parsedJSON", parseJson($"data"))
parsed.show(1)
All values came out as null when i used these parsers.
My expected result: I am trying to get a flattened out structure from the dataset
|b |x |c | ID|
--------------------
|value1|23|valC|A1 |
|value2|52|valC|A1 |
Upvotes: 3
Views: 2865
Reputation: 330073
I believe that all required pieces of the puzzle are already here so let's follow this step by step. Your data is equivalent to:
val df = Seq((
"""{"a":[{"b":"value1"},{"b": "value2"}],"c":"valC"}""", "A1", "X1", "Y1"
)).toDF("col1", "ID", "field2", "field3")
Spark provides json4s which implements the same query API as Lift:
import org.json4s._
import org.json4s.jackson.JsonMethods._
and we can use for example LINQ style API to define an UDF:
val getBs = udf((s: String) => for {
JString(b) <- parse(s) \ "a" \ "b"
} yield b)
If you want to extract multiple fields you can of course extend this. For example if JSON string has multiple fields
{"a":[{"b":"value1","d":1},{"b":"value2","d":2}],"c":"valC"}
you can:
for {
JObject(a) <- parse(s) \ "a"
JField("b", JString(b)) <- a
JField("d", JInt(d)) <- a
} yield (b, d)
This assumes that both fields are present otherwise there won't be a match. To handle missing fields you may prefer XPath-like expressions or extractors:
case class A(b: Option[String], d: Option[Int])
(parse(s) \ "a").extract(Seq[A])
UDF like this can be uses with explode
to extract fields:
val withBs = df.withColumn("b", explode(getBs($"col1")))
with result:
+--------------------+---+------+------+------+
| col1| ID|field2|field3| b|
+--------------------+---+------+------+------+
|{"a":[{"b":"value...| A1| X1| Y1|value1|
|{"a":[{"b":"value...| A1| X1| Y1|value2|
+--------------------+---+------+------+------+
Your attempt to use Lift is incorrect because you expect a
to be sequence of aInfo
but define it only as Option[aInfo]
. It should be Option[Seq[aInfo]]
:
case class col1(a: Option[Seq[aInfo]], c: String)
With class defined like this parsing should work without an issue.
If you use a current build (Spark 2.1.0) there is a from_json
method introduced by SPARK-17699 which requires a schema:
import org.apache.spark.sql.types._
val bSchema = StructType(Seq(StructField("b", StringType, true)))
val aSchema = StructField("a", ArrayType(bSchema), true)
val cSchema = StructField("c", StringType, true)
val schema = StructType(Seq(aSchema, cSchema))
and can be applied as:
import org.apache.spark.sql.functions.from_json
val parsed = df.withColumn("col1", from_json($"col1", schema))
After that you can select fields using usual notation:
parsed.select($"col1.a.b")
Upvotes: 2