Reputation: 91
Merge rows in a spark Dataframe
I have data like following
ID Name Passport Country License UpdatedtimeStamp
1 Ostrich 12345 - ABC 11-02-2018
1 - - - BCD 10-02-2018
1 Shah 12345 - - 12-02-2018
2 PJ - ANB a 10-02-2018
Output required is
ID Name Passport Country License UpdatedtimeStamp
1 Shah 12345 - ABC 12-02-2018
2 PJ - ANB a 10-02-2018
Basically, Data in same ID
should merge, and latest updated and not null
record should be in the output, if all values are null
, then null
should be retained..
Please suggest... Also, suggest it without using SparkSQL Window
functions as i need it to be very fast
Upvotes: 6
Views: 8439
Reputation: 1114
If you want to stay completely in sparkSQL
val df= Seq((1,Some("ostrich"), Some(12345), None, Some("ABC")," 11-02-2018" ),
(1,None, None, None, Some("BCD"), "10-02-2018"),(1,Some("Shah"), Some(12345), None,None, "12-02-2018"),
(2,Some("PJ"), None, Some("ANB"), Some("a"), "10-02-2018")).toDF("ID","Name","Passport","Country","License","UpdatedtimeStamp")
val df1= df.withColumn("date", to_date($"UpdatedtimeStamp","MM-dd-yyyy" )).drop($"UpdatedtimeStamp")
val win = Window.partitionBy("ID").orderBy($"date".desc)
val df2=df1.select($"*", row_number.over(win).as("r")).orderBy($"ID", $"r").drop("r")
val exprs= df2.columns.drop(1).map(x=>collect_list(x).as(x+"_grp"))
val df3=df2.groupBy("ID").agg(exprs.head,exprs.tail: _*)
val exprs2= df3.columns.drop(1).map(x=> col(x)(0).as(x))
df3.select((Array(col(df2.columns(0)))++exprs2): _*).show
+---+----+--------+-------+-------+----------+
| ID|Name|Passport|Country|License| date|
+---+----+--------+-------+-------+----------+
| 1|Shah| 12345| null| ABC|2018-12-02|
| 2| PJ| null| ANB| a|2018-10-02|
+---+----+--------+-------+-------+----------+
Upvotes: 2
Reputation: 41987
you can achieve your result by defining a udf
function and passing the collected struct columns to the udf
function for sorting and populating the nulls with not null values. (comments are provided in the code for explanation)
import org.apache.spark.sql.functions._
//udf function definition
def sortAndAggUdf = udf((structs: Seq[Row])=>{
//sorting the collected list by timestamp in descending order
val sortedStruct = structs.sortBy(str => str.getAs[Long]("UpdatedtimeStamp"))(Ordering[Long].reverse)
//selecting the first struct and casting to out case class
val first = out(sortedStruct(0).getAs[String]("Name"), sortedStruct(0).getAs[String]("Passport"), sortedStruct(0).getAs[String]("Country"), sortedStruct(0).getAs[String]("License"), sortedStruct(0).getAs[Long]("UpdatedtimeStamp"))
//aggregation for checking nulls and populating first not null value
sortedStruct
.foldLeft(first)((x, y) => {
out(
if(x.Name == null || x.Name.isEmpty) y.getAs[String]("Name") else x.Name,
if(x.Passport == null || x.Passport.isEmpty) y.getAs[String]("Passport") else x.Passport,
if(x.Country == null || x.Country.isEmpty) y.getAs[String]("Country") else x.Country,
if(x.License == null || x.License.isEmpty) y.getAs[String]("License") else x.License,
x.UpdatedtimeStamp)
})
})
//making the rest of the columns as one column and changing the UpdatedtimeStamp column to long for sorting in udf
df.select(col("ID"), struct(col("Name"), col("Passport"), col("Country"), col("License"), unix_timestamp(col("UpdatedtimeStamp"), "MM-dd-yyyy").as("UpdatedtimeStamp")).as("struct"))
//grouping and collecting the structs and passing to udf function for manipulation
.groupBy("ID").agg(sortAndAggUdf(collect_list("struct")).as("struct"))
//separating the aggregated columns to separate columns
.select(col("ID"), col("struct.*"))
//getting the date in correct format
.withColumn("UpdatedtimeStamp", date_format(col("UpdatedtimeStamp").cast("timestamp"), "MM-dd-yyyy"))
.show(false)
which should give you
+---+----+--------+-------+-------+----------------+
|ID |Name|Passport|Country|License|UpdatedtimeStamp|
+---+----+--------+-------+-------+----------------+
|1 |Shah|12345 |null |ABC |12-02-2018 |
|2 |PJ |null |ANB |a |10-02-2018 |
+---+----+--------+-------+-------+----------------+
and of course a case class is needed
case class out(Name: String, Passport: String, Country: String, License: String, UpdatedtimeStamp: Long)
Upvotes: 1