haps10
haps10

Reputation: 3536

Relational transformations in Spark

I'm trying to using Spark DataSet to load quite big data of (let's say) persons where the subset data looks as follows.

|age|maritalStatus|    name|sex|
+---+-------------+--------+---+
| 35|            M|  Joanna|  F|
| 25|            S|Isabelle|  F|
| 19|            S|    Andy|  M|
| 70|            M|  Robert|  M|
+---+-------------+--------+---+

My need is to have relational transformations where one column derives its value from other column(s). For example, based on the "age" & "sex" of each person record I need to put Mr or Ms/Mrs in front of each "name" attribute. Another example, is that for a person with "age" over 60, I need to mark him or her as a Senior Citizen (derived column "seniorCitizen" as Y).

My final need for transformed data is as follows:

+---+-------------+---------------------------+---+
|age|maritalStatus|         name|seniorCitizen|sex|
+---+-------------+---------------------------+---+
| 35|            M|  Mrs. Joanna|            N|  F|
| 25|            S| Ms. Isabelle|            N|  F|
| 19|            S|     Mr. Andy|            N|  M|
| 70|            M|   Mr. Robert|            Y|  M|
+---+-------------+--------+------------------+---+

Most of the transformations that Spark provides are quite static and not dyanmic. For example, as defined in examples here and here.

I'm using Spark Datasets because I'm loading from a relational data source but if you may suggest a better way of doing this using plain RDDs, please do.

Upvotes: 0

Views: 68

Answers (2)

koiralo
koiralo

Reputation: 23109

You can use withColumn to add a new column, for seniorCitizen with using where clause and for updating name you can use a user defined function (udf) as below

import spark.implicits._

import org.apache.spark.sql.functions._
//create a dummy data 
val df = Seq((35, "M", "Joanna", "F"),
    (25, "S", "Isabelle", "F"),
    (19, "S", "Andy", "M"),
    (70, "M", "Robert", "M")
  ).toDF("age", "maritalStatus", "name", "sex")

// create a udf to update name according to age and sex
val append = udf((name: String, maritalStatus:String, sex: String) => {
  if (sex.equalsIgnoreCase("F") &&  maritalStatus.equalsIgnoreCase("M")) s"Mrs. ${name}"
  else if (sex.equalsIgnoreCase("F")) s"Ms. ${name}"
  else s"Mr. ${name}"
})

//add two new columns with using withColumn  
df.withColumn("name", append($"name", $"maritalStatus", $"sex"))
  .withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")).show

Output:

+---+-------------+------------+---+-------------+
|age|maritalStatus|        name|sex|seniorCitizen|
+---+-------------+------------+---+-------------+
| 35|            M| Mrs. Joanna|  F|            N|
| 25|            S|Ms. Isabelle|  F|            N|
| 19|            S|    Mr. Andy|  M|            N|
| 70|            M|  Mr. Robert|  M|            Y|
+---+-------------+------------+---+-------------+

EDIT:

Here is the the output without using UDF

df.withColumn("name",
  when($"sex" === "F", when($"maritalStatus" === "M",  concat(lit("Ms. "), df("name"))).otherwise(concat(lit("Ms. "), df("name"))))
    .otherwise(concat(lit("Ms. "), df("name"))))
  .withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y"))

Hope this helps!

Upvotes: 2

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

Spark functions can help you get your work done. You can combine when, concat, lit functions as stated below

val updateName = when(lower($"maritalStatus") === "m" && lower($"sex") === "f", concat(lit("Mrs. "), $"name"))
                      .otherwise(when(lower($"maritalStatus") === "s" && lower($"sex") === "f", concat(lit("Ms. "), $"name"))
                      .otherwise(when(lower($"sex") === "m", concat(lit("Mr. "), $"name"))))

val updatedDataSet = dataset.withColumn("name", updateName)
  .withColumn("seniorCitizen", when($"age" > 60, "Y").otherwise("N"))

updatedDataSet is your required dataset

Upvotes: 1

Related Questions