Reputation: 13
{
"Register": {
"Persons": [
{
"Name": "Name1",
"Age": 12,
"Address": [
{
"Apt": "Apt1"
}
],
"Phone": [
{
"PhoneNum": 1234
}
]
},
{
"Name": "Name2",
"Age": 14,
"Address": [
{
"Apt": "Apt2"
}
],
"Phone": [
{
"PhoneNum": 55555
}
]
}
]
}
}
I need to create a new DF based on condition Apt= Apt1 and Change Phone number of that entry to 7777. NB: Need to keep the same structure. I have tried out couple methods in scala-spark, but not able to update the nested array struct type. Any expert advise will be helpful.
Update: Following this link, i am able to get the named_struct variables. When it comes to array, I am not able to get the answer. https://kb.databricks.com/data/update-nested-column.html#how-to-update-nested-columns
Upvotes: 1
Views: 1292
Reputation: 14845
The idea is to use case classes to convert the nested structure into a set of simple Scala classes that can be more easily handled - or in Spark terms: use a (typed) Dataset instead of a untyped DataFrame.
case class Phone(var PhoneNum:String)
case class Apt(Apt:String)
case class Person(Name: String, Age: Long, Address:Array[Apt], Phone:Array[Phone])
case class Register(Persons:Array[Person])
case class TopLevel(Register:Register)
Convert the dataframe into a dataset and then apply a map
call on each entry of the dataset:
val df = ...
val ds = df.as[TopLevel]
val transformed = ds.map(tl => {
for( p <- tl.Register.Persons) {
if(p.Address.contains(Apt("Apt1"))) p.Phone.transform(_ => Phone("7777"))
}
tl
})
transformed.toJSON.show(false)
prints:
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"Register":{"Persons":[{"Name":"Name1","Age":12,"Address":[{"Apt":"Apt1"}],"Phone":[{"PhoneNum":"7777"}]},{"Name":"Name2","Age":14,"Address":[{"Apt":"Apt2"}],"Phone":[{"PhoneNum":"55555"}]}]}}|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
A remark on the data structure/schema in the question:
As the question is asked, a dataframe of registers is used. That means that each entry of the dataframe contains a single register. It would be more intuitively if the dataframe contained a list of persons and if this list of persons was called "Register". This would lead to a much easier structure of the data. In this case, the classes TopLevel
and Register
could be omitted.
Upvotes: 1
Reputation: 596
The first step is the mapping of your json in dataframe, Then we create a custom UDF that takes in input the Apt column, the PhoneNum column and the new phone number allows to change the phone number if Apt = Apt1
def main(args: Array[String]): Unit = {
val inputJson = "{\"Register\":{\"Persons\":[{\"Name\":\"Name1\",\"Age\":12,\"Address\":[{\"Apt\":\"Apt1\"}],\"Phone\":[{\"PhoneNum\":1234}]},{\"Name\":\"Name2\",\"Age\":14,\"Address\":[{\"Apt\":\"Apt2\"}],\"Phone\":[{\"PhoneNum\":55555}]}]}}"
import sparkSession.implicits._
val outputDataFrame = sparkSession.read.option("multiline", true).option("mode","PERMISSIVE")
.json(Seq(inputJson).toDS)
.select(
// First layer mapping
col("Register").getItem("Persons").as("Persons")
)
.withColumn("Persons", explode(col("Persons")))
.select(
// Second layer mapping
col("Persons").getItem("Name").as("Name"),
col("Persons").getItem("Age").as("Age"),
col("Persons").getItem("Address").as("Address"),
col("Persons").getItem("Phone").as("Phone")
)
.select(col("Name"),col("Age"),
// last layer mapping
col("Address").getItem("Apt").as("Apt"),
col("Phone").getItem("PhoneNum").as("PhoneNum"))
.withColumn("Apt", explode(col("Apt")))
.withColumn("PhoneNum", explode(col("PhoneNum")))
.withColumn("PhoneNum", changePhoneNumUDF(col("Apt"), col("PhoneNum"), lit(987654))) // apply user defined function to change PhoneNume according to Apt
outputDataFrame.show
}
def changePhoneNum(Apt : String, oldPhoneNum : Long ,NewPhoneNum : Long) : Long = Apt match {
case "Apt1" => NewPhoneNum
case _ => oldPhoneNum
}
val changePhoneNumUDF = udf(changePhoneNum _)
}
Output :
+-----+---+----+--------+
| Name|Age| Apt|PhoneNum|
+-----+---+----+--------+
|Name1| 12|Apt1| 987654|
|Name2| 14|Apt2| 55555|
+-----+---+----+--------+
Upvotes: 0