arjunj
arjunj

Reputation: 1516

Convert a json string to array of key-value pairs in Spark scala

I have a JSON string that I load into a Spark DataFrame. The JSON string can have between 0 and 3 key-value pairs.

When more than one kv pairs are sent, the product_facets is correctly formatted as an array like below:

{"id":1,
  "productData":{
  "product":{
  "product_name":"xyz",
  "product_facets":{"entry":[{"key":"test","value":"success"}, {"key": "test2","value" : "fail"}]}
 }}}

I can now use the explode function:

sourceDF.filter($"someKey".contains("some_string"))
  .select($"id", explode($"productData.product.product_facets.entry") as "kvPairs")

However when only one key value was sent, the source JSON string for entry is not formatted as an array with square braces:

{"id":1,
  "productData":{
  "product":{
  "product_name":"xyz",
  "product_facets":{"entry":{"key":"test","value":"success"}}
 }}}

The schema for product tag looks like:

|    |-- product: struct (nullable = true)
|    |    |-- product_facets: struct (nullable = true)
|    |    |    |-- entry: string (nullable = true)
|    |    |-- product_name: string (nullable = true)

How can I change the entry to an array of key value pairs that is compatible with the explode function. My end goal is to pivot the keys into individual columns and I want to use group by on exploding the kv pairs. I tried using from_json but could not get it to work.

    val schema =
    StructType(
      Seq(
        StructField("entry", ArrayType(
          StructType(
            Seq(
              StructField("key", StringType),
              StructField("value",StringType)
            )
          )
        ))
      )
    )

sourceDF.filter($"someKey".contains("some_string"))
      .select($"id", from_json($"productData.product.product_facets.entry", schema) as "kvPairsFromJson")

But the above does creates a new column kvPairsFromJson that looks like "[]" and using explode does nothing.

Any pointers on whats going on or if there is a better way to do this?

Upvotes: 1

Views: 4524

Answers (1)

kode
kode

Reputation: 384

I think one approach could be :
1. Create a udf which takes entry value as json string, and converts it to List( Tuple(K, V))
2. In udf, check if entry value is array or not and do conversion accordingly.

The code below explains above approach:

// one row where entry is array and other non-array
val ds = Seq("""{"id":1,"productData":{"product":{"product_name":"xyz","product_facets":{"entry":[{"key":"test","value":"success"},{"key":"test2","value":"fail"}]}}}}""", """{"id":2,"productData":{"product":{"product_name":"xyz","product_facets":{"entry":{"key":"test","value":"success"}}}}}""").toDS

val df = spark.read.json(ds)

// Schema used by udf to generate output column    
import org.apache.spark.sql.types._
val outputSchema = ArrayType(StructType(Seq(
  StructField("key", StringType, false),
  StructField("value", StringType, false)
)))

// Converts non-array entry value to array
val toArray = udf((json: String) => {

  import com.fasterxml.jackson.databind._
  import com.fasterxml.jackson.module.scala.DefaultScalaModule

  val jsonMapper = new ObjectMapper()
  jsonMapper.registerModule(DefaultScalaModule)

  if(!json.startsWith("[")) {
    val jsonMap = jsonMapper.readValue(json, classOf[Map[String, String]])
    List((jsonMap("key"), jsonMap("value")))
  } else {
    jsonMapper.readValue(json, classOf[List[Map[String, String]]]).map(f => (f("key"), f("value")))
  } 

}, outputSchema)

val arrayResult = df.select(col("id").as("id"), toArray(col("productData.product.product_facets.entry")).as("entry"))

val arrayExploded = df.select(col("id").as("id"), explode(toArray(col("productData.product.product_facets.entry"))).as("entry"))

val explodedToCols = df.select(col("id").as("id"), explode(toArray(col("productData.product.product_facets.entry"))).as("entry")).select(col("id"), col("entry.key").as("key"), col("entry.value").as("value"))

Results in:

scala> arrayResult.printSchema
root
 |-- id: long (nullable = true)
 |-- entry: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = false)
 |    |    |-- value: string (nullable = false)


scala> arrayExploded.printSchema
root
 |-- id: long (nullable = true)
 |-- entry: struct (nullable = true)
 |    |-- key: string (nullable = false)
 |    |-- value: string (nullable = false)

scala> arrayResult.show(false)
+---+--------------------------------+
|id |entry                           |
+---+--------------------------------+
|1  |[[test, success], [test2, fail]]|
|2  |[[test, success]]               |
+---+--------------------------------+

scala> arrayExploded.show(false)
+---+---------------+
|id |entry          |
+---+---------------+
|1  |[test, success]|
|1  |[test2, fail]  |
|2  |[test, success]|
+---+---------------+

Upvotes: 1

Related Questions