sandevfares
sandevfares

Reputation: 245

how to use spark lag and lead over group by and order by

i use : `

dataset.withColumn("lead",lead(dataset.col(start_date),1).over(orderBy(start_date)));

` i just want to add group by trackId so lead work over each group by as any agg function :

+----------+---------------------------------------------+
|  trackId |  start_time    |  end_time   |      lead    |
+-----+--------------------------------------------------+
|  1       | 12:00:00       |   12:04:00  |     12:05:00 |
+----------+---------------------------------------------+
|  1       | 12:05:00       |   12:08:00  |    12:20:00  |  
+----------+---------------------------------------------+
|  1       | 12:20:00       |   12:22:00  |     null     | 
+----------+---------------------------------------------+
|  2       | 13:00:00       |   13:04:00  |    13:05:00 |
+----------+---------------------------------------------+
|  2       | 13:05:00       |   13:08:00  |    13:20:00  |  
+----------+---------------------------------------------+
|  2       | 13:20:00       |   13:22:00  |     null     | 
+----------+---------------------------------------------+

any help how to do that ?

Upvotes: 8

Views: 11009

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

All you are missing is the Window keyword and partitionBy method call

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
dataset.withColumn("lead",lead(col("start_time"),1).over(Window.partitionBy("trackId").orderBy("start_time")))

Upvotes: 9

koiralo
koiralo

Reputation: 23109

You need to use Window

val df = Seq(
  (1, "12:00:00", "12:04:00"),
  (1, "12:05:00", "12:08:00"),
  (1, "12:20:00", "12:22:00"),
  (2, "13:00:00", "13:04:00"),
  (2, "13:05:00", "13:08:00"),
  (2, "13:20:00", "13:22:00")
).toDF( "trackId","start_time","end_time" )

val window  = Window.partitionBy("trackId").orderBy("start_time")

df.withColumn("lead",lead(col("start_time"),1).over(window))

If you don't want null then you can pass the default value too as lead($"start_time",1, defaultValue)

Result :

+-------+----------+--------+--------+
|trackId|start_time|end_time|lead    |
+-------+----------+--------+--------+
|1      |12:00:00  |12:04:00|12:05:00|
|1      |12:05:00  |12:08:00|12:20:00|
|1      |12:20:00  |12:22:00|null    |
|2      |13:00:00  |13:04:00|13:05:00|
|2      |13:05:00  |13:08:00|13:20:00|
|2      |13:20:00  |13:22:00|null    |
+-------+----------+--------+--------+

Upvotes: 3

Related Questions