Reputation: 502
I have a string column named "personData" which contains a json message captured from a kafka topic in a dataframe. I have to parse the json, tokenize only the values in the json (key,value) pairs and replace the "personData" with the tokenized value.
I have a hive udf called "mmtok" which takes 2 parameters, one is the field to be tokenized and the other is the tokenization policy.
I have tried below code and it is working in spark-shell, but is not working when I run the code on cluster. Can someone help me tweak this code to work on the cluster as well?
// Define a tokenization function
def tokenizeJson(jsonValue: JValue): JValue = {
try {
// Define the tokenization logic - here you will tokenize only the values
jsonValue.transformField {
case JField(key, JNull) => (key, JNull)
case JField(key, JString(value)) if value.trim.isEmpty => (key, JString(""))
case JField(key, JString(value)) =>
val tokenValue = Option(spark.sql(s"SELECT mmtok('$value', 'myfunc')").collect()(0)(0)).map(_.toString).getOrElse("")
(key, JString(tokenValue))
case JField(key, JObject(innerFields)) => (key, tokenizeJson(JObject(innerFields)))
}
} catch {
case e: Exception =>
println(s"Error while processing JSON: ${e.getMessage}")
JNothing
}
}
// UDF to parse, tokenize and convert the tokenized value to string
val tokenizeJsonUDF = udf((jsonStr: String) => {
val parsedJson = parse(jsonStr)
val tokenizedJson = tokenizeJson(parsedJson)
val tokenizedValue = compact(render(tokenizedJson))
tokenizedValue
})
// Sample JSON data
val myStr = """{
"name": "John",
"address": {
"street": "5th Avenue",
"city": {
"name": "New York",
"details": {
"population": "8 million",
"area": "783.8 km²"
}
}
}
}"""
// Sample data including the json data
val data = Seq("John", "Business", myStr, "US", "Travel")
// Creating a dataframe
val myDF = data.toDF("name", "profession", "personData", "country", "hobby")
// Final dataframe with tokenized values in the Json
val finalDF = myDF.withColumn("personData", tokenizeJsonUDF(col("personData")))
finalDF.show(false)
This code perfectly tokenizes the json values when run in spark-shell but fails when run in cluster environment. Please check and let me know if any tweaks can be done to make it work in cluster environment.
Upvotes: 0
Views: 40