Reputation: 3536
I'm trying to using Spark DataSet to load quite big data of (let's say) persons where the subset data looks as follows.
|age|maritalStatus| name|sex|
+---+-------------+--------+---+
| 35| M| Joanna| F|
| 25| S|Isabelle| F|
| 19| S| Andy| M|
| 70| M| Robert| M|
+---+-------------+--------+---+
My need is to have relational transformations where one column derives its value from other column(s). For example, based on the "age" & "sex" of each person record I need to put Mr or Ms/Mrs in front of each "name" attribute. Another example, is that for a person with "age" over 60, I need to mark him or her as a Senior Citizen (derived column "seniorCitizen" as Y).
My final need for transformed data is as follows:
+---+-------------+---------------------------+---+
|age|maritalStatus| name|seniorCitizen|sex|
+---+-------------+---------------------------+---+
| 35| M| Mrs. Joanna| N| F|
| 25| S| Ms. Isabelle| N| F|
| 19| S| Mr. Andy| N| M|
| 70| M| Mr. Robert| Y| M|
+---+-------------+--------+------------------+---+
Most of the transformations that Spark provides are quite static and not dyanmic. For example, as defined in examples here and here.
I'm using Spark Datasets because I'm loading from a relational data source but if you may suggest a better way of doing this using plain RDDs, please do.
Upvotes: 0
Views: 68
Reputation: 23109
You can use withColumn
to add a new column, for seniorCitizen
with using where
clause and for updating name
you can use a user defined function (udf)
as below
import spark.implicits._
import org.apache.spark.sql.functions._
//create a dummy data
val df = Seq((35, "M", "Joanna", "F"),
(25, "S", "Isabelle", "F"),
(19, "S", "Andy", "M"),
(70, "M", "Robert", "M")
).toDF("age", "maritalStatus", "name", "sex")
// create a udf to update name according to age and sex
val append = udf((name: String, maritalStatus:String, sex: String) => {
if (sex.equalsIgnoreCase("F") && maritalStatus.equalsIgnoreCase("M")) s"Mrs. ${name}"
else if (sex.equalsIgnoreCase("F")) s"Ms. ${name}"
else s"Mr. ${name}"
})
//add two new columns with using withColumn
df.withColumn("name", append($"name", $"maritalStatus", $"sex"))
.withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")).show
Output:
+---+-------------+------------+---+-------------+
|age|maritalStatus| name|sex|seniorCitizen|
+---+-------------+------------+---+-------------+
| 35| M| Mrs. Joanna| F| N|
| 25| S|Ms. Isabelle| F| N|
| 19| S| Mr. Andy| M| N|
| 70| M| Mr. Robert| M| Y|
+---+-------------+------------+---+-------------+
EDIT:
Here is the the output without using UDF
df.withColumn("name",
when($"sex" === "F", when($"maritalStatus" === "M", concat(lit("Ms. "), df("name"))).otherwise(concat(lit("Ms. "), df("name"))))
.otherwise(concat(lit("Ms. "), df("name"))))
.withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y"))
Hope this helps!
Upvotes: 2
Reputation: 41957
Spark functions can help you get your work done. You can combine when
, concat
, lit
functions as stated below
val updateName = when(lower($"maritalStatus") === "m" && lower($"sex") === "f", concat(lit("Mrs. "), $"name"))
.otherwise(when(lower($"maritalStatus") === "s" && lower($"sex") === "f", concat(lit("Ms. "), $"name"))
.otherwise(when(lower($"sex") === "m", concat(lit("Mr. "), $"name"))))
val updatedDataSet = dataset.withColumn("name", updateName)
.withColumn("seniorCitizen", when($"age" > 60, "Y").otherwise("N"))
updatedDataSet
is your required dataset
Upvotes: 1