Darshan Shah
Darshan Shah

Reputation: 91

Merge rows in a spark scala Dataframe

Merge rows in a spark Dataframe

I have data like following

ID  Name    Passport    Country  License    UpdatedtimeStamp
1   Ostrich 12345       -       ABC         11-02-2018
1   -       -           -       BCD         10-02-2018
1   Shah    12345       -       -           12-02-2018
2   PJ      -           ANB     a           10-02-2018

Output required is

ID  Name    Passport    Country  License    UpdatedtimeStamp
1   Shah    12345       -       ABC         12-02-2018
2   PJ      -           ANB     a           10-02-2018

Basically, Data in same ID should merge, and latest updated and not null record should be in the output, if all values are null, then null should be retained..

Please suggest... Also, suggest it without using SparkSQL Window functions as i need it to be very fast

Upvotes: 6

Views: 8439

Answers (2)

mikeL
mikeL

Reputation: 1114

If you want to stay completely in sparkSQL

val df= Seq((1,Some("ostrich"), Some(12345), None, Some("ABC")," 11-02-2018" ),
(1,None, None, None, Some("BCD"), "10-02-2018"),(1,Some("Shah"), Some(12345), None,None, "12-02-2018"),
(2,Some("PJ"), None, Some("ANB"), Some("a"), "10-02-2018")).toDF("ID","Name","Passport","Country","License","UpdatedtimeStamp")


val df1= df.withColumn("date", to_date($"UpdatedtimeStamp","MM-dd-yyyy" )).drop($"UpdatedtimeStamp")

val win = Window.partitionBy("ID").orderBy($"date".desc)

val df2=df1.select($"*", row_number.over(win).as("r")).orderBy($"ID", $"r").drop("r")
val exprs= df2.columns.drop(1).map(x=>collect_list(x).as(x+"_grp"))

val df3=df2.groupBy("ID").agg(exprs.head,exprs.tail: _*)

val exprs2= df3.columns.drop(1).map(x=> col(x)(0).as(x))

df3.select((Array(col(df2.columns(0)))++exprs2): _*).show


+---+----+--------+-------+-------+----------+
| ID|Name|Passport|Country|License|      date|
+---+----+--------+-------+-------+----------+
|  1|Shah|   12345|   null|    ABC|2018-12-02|
|  2|  PJ|    null|    ANB|      a|2018-10-02|
+---+----+--------+-------+-------+----------+

Upvotes: 2

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

you can achieve your result by defining a udf function and passing the collected struct columns to the udf function for sorting and populating the nulls with not null values. (comments are provided in the code for explanation)

import org.apache.spark.sql.functions._
//udf function definition
def sortAndAggUdf = udf((structs: Seq[Row])=>{
  //sorting the collected list by timestamp in descending order
  val sortedStruct = structs.sortBy(str => str.getAs[Long]("UpdatedtimeStamp"))(Ordering[Long].reverse)
  //selecting the first struct and casting to out case class
  val first = out(sortedStruct(0).getAs[String]("Name"), sortedStruct(0).getAs[String]("Passport"), sortedStruct(0).getAs[String]("Country"), sortedStruct(0).getAs[String]("License"), sortedStruct(0).getAs[Long]("UpdatedtimeStamp"))
  //aggregation for checking nulls and populating first not null value
  sortedStruct
    .foldLeft(first)((x, y) => {
      out(
        if(x.Name == null || x.Name.isEmpty) y.getAs[String]("Name") else x.Name,
        if(x.Passport == null || x.Passport.isEmpty) y.getAs[String]("Passport") else x.Passport,
        if(x.Country == null || x.Country.isEmpty) y.getAs[String]("Country") else x.Country,
        if(x.License == null || x.License.isEmpty) y.getAs[String]("License") else x.License,
        x.UpdatedtimeStamp)
    })
})
//making the rest of the columns as one column and changing the UpdatedtimeStamp column to long for sorting in udf
df.select(col("ID"), struct(col("Name"), col("Passport"), col("Country"), col("License"), unix_timestamp(col("UpdatedtimeStamp"), "MM-dd-yyyy").as("UpdatedtimeStamp")).as("struct"))
    //grouping and collecting the structs and passing to udf function for manipulation
    .groupBy("ID").agg(sortAndAggUdf(collect_list("struct")).as("struct"))
    //separating the aggregated columns to separate columns
    .select(col("ID"), col("struct.*"))
    //getting the date in correct format
    .withColumn("UpdatedtimeStamp", date_format(col("UpdatedtimeStamp").cast("timestamp"), "MM-dd-yyyy"))
 .show(false)

which should give you

+---+----+--------+-------+-------+----------------+
|ID |Name|Passport|Country|License|UpdatedtimeStamp|
+---+----+--------+-------+-------+----------------+
|1  |Shah|12345   |null   |ABC    |12-02-2018      |
|2  |PJ  |null    |ANB    |a      |10-02-2018      |
+---+----+--------+-------+-------+----------------+

and of course a case class is needed

case class out(Name: String, Passport: String, Country: String, License: String, UpdatedtimeStamp: Long)

Upvotes: 1

Related Questions