Manoj Kumar G
Manoj Kumar G

Reputation: 502

Parsing a json column and tokenizing only values in scala spark code

I have a string column named "personData" which contains a json message captured from a kafka topic in a dataframe. I have to parse the json, tokenize only the values in the json (key,value) pairs and replace the "personData" with the tokenized value.

I have a hive udf called "mmtok" which takes 2 parameters, one is the field to be tokenized and the other is the tokenization policy.

I have tried below code and it is working in spark-shell, but is not working when I run the code on cluster. Can someone help me tweak this code to work on the cluster as well?

// Define a tokenization function

def tokenizeJson(jsonValue: JValue): JValue = {
  try {    
    // Define the tokenization logic - here you will tokenize only the values
    jsonValue.transformField {
      case JField(key, JNull) => (key, JNull)
      case JField(key, JString(value)) if value.trim.isEmpty => (key, JString("")) 
      case JField(key, JString(value)) => 
        val tokenValue = Option(spark.sql(s"SELECT mmtok('$value', 'myfunc')").collect()(0)(0)).map(_.toString).getOrElse("")
    (key, JString(tokenValue))
      case JField(key, JObject(innerFields)) => (key, tokenizeJson(JObject(innerFields)))
    }
  } catch {
    case e: Exception => 
      println(s"Error while processing JSON: ${e.getMessage}")
      JNothing
  }
}

// UDF to parse, tokenize and convert the tokenized value to string

val tokenizeJsonUDF = udf((jsonStr: String) => {
    val parsedJson = parse(jsonStr)
    val tokenizedJson = tokenizeJson(parsedJson)
    val tokenizedValue = compact(render(tokenizedJson))
    tokenizedValue
})


// Sample JSON data

val myStr = """{
  "name": "John",
  "address": {
    "street": "5th Avenue",
    "city": {
      "name": "New York",
      "details": {
        "population": "8 million",
        "area": "783.8 km²"
      }
    }
  }
}"""

// Sample data including the json data
val data = Seq("John", "Business", myStr, "US", "Travel")

// Creating a dataframe
val myDF = data.toDF("name", "profession", "personData", "country", "hobby")

// Final dataframe with tokenized values in the Json
val finalDF = myDF.withColumn("personData", tokenizeJsonUDF(col("personData")))

finalDF.show(false)

This code perfectly tokenizes the json values when run in spark-shell but fails when run in cluster environment. Please check and let me know if any tweaks can be done to make it work in cluster environment.

Upvotes: 0

Views: 40

Answers (0)

Related Questions