Navin Agarwala
Navin Agarwala

Reputation: 1

Reading Nested Json in Spark-Structured Streaming

I am trying to read data from Kafka using structured streaming. The data received from kafka is in json format. I use a sample json to create the schema and later in the code I use the from_json function to convert the json to a dataframe for further processing. The problem I am facing is with the nested schema and multi-values. The sample schema defines a tag (say a) as a struct. The json data read from kafka can have either one or multiple values for the same tag ( in two different values).

val df0= spark.read.format("json").load("contactSchema0.json")
val schema0 = df0.schema
val df1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "my_first_topic").load()
val df2 = df1.selectExpr("CAST(value as STRING)").toDF()
val df3 = df2.select(from_json($"value",schema0).alias("value")) 

contactSchema0.json has a sample tag as follows:

"contactList": {
        "contact": [{
          "id": 1001
},
{
 "id": 1002
}]
}

Thus contact is inferred as a struct. But the JSON data read from Kafka can also have data as follows:

"contactList": {
                "contact": {
                  "id": 1001
        }
    }

So if I define the schema as a struct, spark.json is unable to infer single values and in case if I define the schema as string spark.json is unable to infer multi-values.

Upvotes: 0

Views: 985

Answers (1)

ollik1
ollik1

Reputation: 4540

Can't find such feature in Spark JSON Options but Jackson has DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY as described in this answer.

So we can get around with something like this

case class MyModel(contactList: ContactList)
case class ContactList(contact: Array[Contact])
case class Contact(id: Int)

val txt =
  """|{"contactList": {"contact": [{"id": 1001}]}}
     |{"contactList": {"contact": {"id": 1002}}}"""
    .stripMargin.lines.toSeq.toDS()

txt
  .mapPartitions[MyModel] { it: Iterator[String] =>
    val reader = new ObjectMapper()
      .registerModule(DefaultScalaModule)
      .enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
      .readerFor(classOf[MyModel])
    it.map(reader.readValue[MyModel])
  }
  .show()

Output:

+-----------+
|contactList|
+-----------+
| [[[1001]]]|
| [[[1002]]]|
+-----------+

Note that to get a Dataset in your code, you could use

val df2 = df1.selectExpr("CAST(value as STRING)").as[String]

instead and then call mapPartitions for df2 like before.

Upvotes: 2

Related Questions