Surender Raja
Surender Raja

Reputation: 3609

Json Parsing throws unexpected output inside Spark UDF

I have a dataframe .

Datatypes of all the columns in that dataframe are string . Some of the columns are jsonString

 +--------+---------+--------------------------+
 |event_id|event_key|              rights      |
 +--------+---------+--------------------------+
 |     410|(default)|{"conditions":[{"devic...|
 +--------+---------+--------------------------+

I want to parse that jsonString alone and take a value from that and add that as a new column. I am using Jackson parser to do that .

Here is the value of "rights"

 {
"conditions": [
    {
        "devices": [
            {
                "connection": [
                    "BROADBAND",
                    "MOBILE"
                ],
                "platform": "IOS",
                "type": "MOBILE",
                "provider": "TELETV"
            },
            {
                "connection": [
                    "BROADBAND",
                    "MOBILE"
                ],
                "platform": "ANDROID",
                "type": "MOBILE",
                "provider": "TELETV"
            },
            {
                "connection": [
                    "BROADBAND",
                    "MOBILE"
                ],
                "platform": "IOS",
                "type": "TABLET",
                "provider": "TELETV"
            },
            {
                "connection": [
                    "BROADBAND",
                    "MOBILE"
                ],
                "platform": "ANDROID",
                "type": "TABLET",
                "provider": "TELETV"
            }
        ],
        "endDateTime": "2017-01-09T22:59:59.000Z",
        "inclusiveGeoTerritories": [
            "DE",
            "IT",
            "ZZ"
        ],
        "mediaType": "Linear",
        "offers": [
            {
                "endDateTime": "2017-01-09T22:59:59.000Z",
                "isRestartable": true,
                "isRecordable": true,
                "isCUTVable": false,
                "recordingMode": "UNIQUE",
                "retentionCUTV": "P7DT2H",
                "retentionNPVR": "P2Y6M5DT12H35M30S",
                "offerId": "MOTOGP-RACE",
                "offerType": "IPPV",
                "startDateTime": "2017-01-09T17:00:00.000Z"
            }
        ],
        "platformName": "USA",
        "startDateTime": "2017-01-09T17:00:00.000Z",
        "territory": "USA"
    }
 ]
}

Now I want to create a new column in the existing dataframe . the name of the new column to be added is "provider"

 conditions -> devices -> provider

I wanted to do this for very row in the dataframe . Hence i created a UDF and i am passing the column which holds the jsonString to that udf and inside that udf i wanted to parse the json string and needs to return a value as string

My spark code :

 import org.apache.spark.sql.functions.udf
 import org.apache.spark.sql.functions._
 import org.json4s._
 import org.json4s.jackson.JsonMethods
 import org.json4s.jackson.JsonMethods._


  //
     some codes to derive base dataframe
  //

  val fetchProvider_udf = udf(fetchProvider _)
  val result = df.withColumn("provider",fetchProvider_udf(col("rights")))
   result.select("event_id","event_key","rights","provider").show(10)


  def fetchProvider(jsonStr:String): String = {

    val json = JsonMethods.parse(jsonStr)

   val providerData = json \\ "conditions" \\"devices" \\ "provider"

   compact(render(providerData))
  }

Also How do I handle if a navigation key is not available? Does it throw exception ? Lets say "conditions" is there and "devices" is there but "provider" key is not there in the json string. then how do I handle that ?

Could someone help me

Expected output :

 +--------+---------+-----------------------+-------------+
 |event_id|event_key|              rights     |provider     |
 +--------+---------+-----------------------+-------------+
 |     410|(unknown)|{"conditions":[{"devic...|    TELETV    |
 +--------+---------+-----------------------+-------------+

But i am getting the below output

 +--------+---------+-----------------------+-------------------------------     ------------------------------------------------------+
 |event_id|event_key|              rights        |                                                     provider     |
      +--------+---------+-----------------------+--------------------------      -----------------------------------------------------------+
 |     410|(unknown)|{"conditions":[{"devic...|    {"provider":"TELETV","provider":"TELETV","provider":"TELETV","provider":"TELETV"      }   |
   +--------+---------+-----------------------+-----------------------------       --------------------------------------------------------+

Upvotes: 1

Views: 1601

Answers (1)

Mariusz
Mariusz

Reputation: 13936

If you want to extract value of first provider you should use the following code inside UDF:

(json \\ "conditions" \\"devices")[0] \\ "provider"

The current code just gets all providers (as a Map), which is then converted to a string as a UDF result.

You should also ensure that your UDF does not raise any exception (because it will result in whole job to fail). The easiest method is to return null and then:

  • if you want to investigate - filter by df.provider.isNull()
  • if you want to keep only valid entries - filter by df.provider.isNullNull()

Upvotes: 0

Related Questions