SRV
SRV

Reputation: 13

Spark -Scala Nested array DF - How to update value based on condition without changing structure?

I have a structure like the following in orc/parquet format.

{
  "Register": {
    "Persons": [
      {
        "Name": "Name1",
        "Age": 12,
        "Address": [
          {
            "Apt": "Apt1"
          }
        ],
        "Phone": [
          {
            "PhoneNum": 1234
          }
        ]
      },
      {
        "Name": "Name2",
        "Age": 14,
        "Address": [
          {
            "Apt": "Apt2"
          }
        ],
        "Phone": [
          {
            "PhoneNum": 55555
          }
        ]
      }

    ]
  }
}

I need to create a new DF based on condition Apt= Apt1 and Change Phone number of that entry to 7777. NB: Need to keep the same structure. I have tried out couple methods in scala-spark, but not able to update the nested array struct type. Any expert advise will be helpful.

Update: Following this link, i am able to get the named_struct variables. When it comes to array, I am not able to get the answer. https://kb.databricks.com/data/update-nested-column.html#how-to-update-nested-columns

Upvotes: 1

Views: 1292

Answers (2)

werner
werner

Reputation: 14845

The idea is to use case classes to convert the nested structure into a set of simple Scala classes that can be more easily handled - or in Spark terms: use a (typed) Dataset instead of a untyped DataFrame.

case class Phone(var PhoneNum:String)
case class Apt(Apt:String)
case class Person(Name: String, Age: Long, Address:Array[Apt], Phone:Array[Phone])
case class Register(Persons:Array[Person])
case class TopLevel(Register:Register)

Convert the dataframe into a dataset and then apply a map call on each entry of the dataset:

val df = ...
val ds = df.as[TopLevel]
val transformed = ds.map(tl => {
  for( p <- tl.Register.Persons) {
    if(p.Address.contains(Apt("Apt1"))) p.Phone.transform(_ => Phone("7777"))
  }
  tl
})
transformed.toJSON.show(false)

prints:

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"Register":{"Persons":[{"Name":"Name1","Age":12,"Address":[{"Apt":"Apt1"}],"Phone":[{"PhoneNum":"7777"}]},{"Name":"Name2","Age":14,"Address":[{"Apt":"Apt2"}],"Phone":[{"PhoneNum":"55555"}]}]}}|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

A remark on the data structure/schema in the question:

As the question is asked, a dataframe of registers is used. That means that each entry of the dataframe contains a single register. It would be more intuitively if the dataframe contained a list of persons and if this list of persons was called "Register". This would lead to a much easier structure of the data. In this case, the classes TopLevel and Register could be omitted.

Upvotes: 1

SimbaPK
SimbaPK

Reputation: 596

The first step is the mapping of your json in dataframe, Then we create a custom UDF that takes in input the Apt column, the PhoneNum column and the new phone number allows to change the phone number if Apt = Apt1

  def main(args: Array[String]): Unit = {


    val inputJson = "{\"Register\":{\"Persons\":[{\"Name\":\"Name1\",\"Age\":12,\"Address\":[{\"Apt\":\"Apt1\"}],\"Phone\":[{\"PhoneNum\":1234}]},{\"Name\":\"Name2\",\"Age\":14,\"Address\":[{\"Apt\":\"Apt2\"}],\"Phone\":[{\"PhoneNum\":55555}]}]}}"


    import sparkSession.implicits._

    val outputDataFrame = sparkSession.read.option("multiline", true).option("mode","PERMISSIVE")
      .json(Seq(inputJson).toDS)
      .select(
          // First layer mapping
          col("Register").getItem("Persons").as("Persons")
        )
      .withColumn("Persons", explode(col("Persons")))
        .select(
          // Second layer mapping
          col("Persons").getItem("Name").as("Name"),
          col("Persons").getItem("Age").as("Age"),
          col("Persons").getItem("Address").as("Address"),
          col("Persons").getItem("Phone").as("Phone")
        )
        .select(col("Name"),col("Age"),
          // last layer mapping
          col("Address").getItem("Apt").as("Apt"),
          col("Phone").getItem("PhoneNum").as("PhoneNum"))
        .withColumn("Apt", explode(col("Apt")))
        .withColumn("PhoneNum", explode(col("PhoneNum")))
        .withColumn("PhoneNum", changePhoneNumUDF(col("Apt"), col("PhoneNum"), lit(987654))) // apply user defined function to change PhoneNume according to Apt

    outputDataFrame.show


  }
  def changePhoneNum(Apt : String, oldPhoneNum : Long ,NewPhoneNum : Long) : Long = Apt match {
    case "Apt1" => NewPhoneNum
    case _ => oldPhoneNum
  }
  val changePhoneNumUDF = udf(changePhoneNum _)
}

Output :

+-----+---+----+--------+
| Name|Age| Apt|PhoneNum|
+-----+---+----+--------+
|Name1| 12|Apt1|  987654|
|Name2| 14|Apt2|   55555|
+-----+---+----+--------+

Upvotes: 0

Related Questions