Rami
Rami

Reputation: 8314

SPARK, DataFrame: difference of Timestamp columns over consecutive rows

I have a DateFrame as follow:

+---+---------------------+---------------------+
|id |initDate             |endDate              |
+---+---------------------+---------------------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|
|138|2016-10-04 00:00:00.0|null                 |
+---+---------------------+---------------------+

The rows are ordered by id then initDate column in ascending order. Both initDate and endDate columns have Timestamp type. For illustrative purpose, I just showed the records belonging to one id value.

My goal is to add a new column, showing for each id the difference (in term of days) between the initDate of each row and the endDate of the previous row.

If there is no previous row, then the value will be -1.

The output should look like this:

+---+---------------------+---------------------+----------+
|id |initDate             |endDate              |difference|
+---+---------------------+---------------------+----------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|-1        |
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|11        |
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|12        |
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|0         |
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|7         |
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|4         |
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|8         |
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|12        |
|138|2016-10-04 00:00:00.0|null                 |7         |
+---+---------------------+---------------------+----------+

I am thinking to use a window function to partition the records by id, but I am not figuring how to do the next steps.

Upvotes: 6

Views: 4525

Answers (3)

vikrant rana
vikrant rana

Reputation: 4674

Just an addition to previously good answers, in case anyone wants to try with spark sql or on Hive.

select tab.tran_id,tab.init_date,tab.end_date,coalesce(tab.day_diff,-1)
as day_diffrence from
(select *,datediff(day,lag(end_date,1) over(partition by tran_id order by init_date)
,init_date) as day_diff from your_table) tab
;

Upvotes: 0

user6022341
user6022341

Reputation:

Try:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("id").orderBy("endDate")

df.withColumn("difference", date_sub($"initDate", lag($"endDate", 1).over(w)))

Upvotes: 5

Rami
Rami

Reputation: 8314

Thanks to the hint of @lostInOverflow, I came up with the following solution:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("id").orderBy("initDate")
val previousEnd = lag($"endDate", 1).over(w)
filteredDF.withColumn("prev", previousEnd)
          .withColumn("difference", datediff($"initDate", $"prev"))

Upvotes: 7

Related Questions