Reputation: 342
I have a df
id, date, item
1, 20180101, A
1, 20180102, A
1, 20180103, B
1, 20180104, A
2, 20180101, C
2, 20180102, D
2, 20180103, D
2, 20180104, D
and I would like to create a new column streak
which contains the number of item streak each user has
id, date, item, streak
1, 20180101, A, 1
1, 20180102, A, 2
1, 20180103, B, 1
1, 20180104, A, 1
2, 20180101, C, 1
2, 20180102, D, 1
2, 20180103, D, 2
2, 20180103, D, 3
I can use the window function row_number
and partition by the id and item to cumulatively count the id-item pair but this will not restart the streak of the count after there is a new item.
Upvotes: 1
Views: 340
Reputation: 13591
Do my best solution is this.
import org.apache.spark.sql.expressions.Window
val w1 = Window.partitionBy("id", "item").orderBy("date")
val w2 = Window.partitionBy("id", "item", "index").orderBy("date")
df.withColumn("lag_date", lag("date", 1, "").over(w1))
.withColumn("jump", not(col("lag_date") === lit("") || date_add(to_date(col("lag_date"), "yyyyMMdd"), 1) === to_date(col("date"), "yyyyMMdd")).cast("int"))
.withColumn("index", sum("jump").over(w1))
.withColumn("streak", row_number.over(w2))
.orderBy("id", "date")
.show(false)
The jump
column is used to calculate the index
where the index
means that the index of streak
. For example, id = 1
and item = A
, there should be 2 indices. The index 0
and 1
indicate the first streak from date = 20180101
to 20180102
and the second streak from date = 20180104
, respectively. If there is a record with date = 20180105
, it will also have the index = 1
and continued to the streak = 2
.
The result is:
+---+--------+----+--------+----+-----+------+
|id |date |item|lag_date|jump|index|streak|
+---+--------+----+--------+----+-----+------+
|1 |20180101|A | |0 |0 |1 |
|1 |20180102|A |20180101|0 |0 |2 |
|1 |20180103|B | |0 |0 |1 |
|1 |20180104|A |20180102|1 |1 |1 |
|2 |20180101|C | |0 |0 |1 |
|2 |20180102|D | |0 |0 |1 |
|2 |20180103|D |20180102|0 |0 |2 |
|2 |20180104|D |20180103|0 |0 |3 |
+---+--------+----+--------+----+-----+------+
where I haven't deleted the temp columns to show how this code works.
Upvotes: 1