Regalia9363
Regalia9363

Reputation: 342

Spark Count Streak of Column Value

I have a df

id, date, item

1, 20180101, A
1, 20180102, A
1, 20180103, B
1, 20180104, A
2, 20180101, C
2, 20180102, D
2, 20180103, D
2, 20180104, D

and I would like to create a new column streak which contains the number of item streak each user has

id, date, item, streak

1, 20180101, A, 1
1, 20180102, A, 2
1, 20180103, B, 1
1, 20180104, A, 1
2, 20180101, C, 1
2, 20180102, D, 1
2, 20180103, D, 2
2, 20180103, D, 3

I can use the window function row_number and partition by the id and item to cumulatively count the id-item pair but this will not restart the streak of the count after there is a new item.

Upvotes: 1

Views: 340

Answers (1)

Lamanus
Lamanus

Reputation: 13591

Do my best solution is this.

import org.apache.spark.sql.expressions.Window

val w1 = Window.partitionBy("id", "item").orderBy("date")
val w2 = Window.partitionBy("id", "item", "index").orderBy("date")
df.withColumn("lag_date", lag("date", 1, "").over(w1))
  .withColumn("jump", not(col("lag_date") === lit("") || date_add(to_date(col("lag_date"), "yyyyMMdd"), 1) === to_date(col("date"), "yyyyMMdd")).cast("int"))
  .withColumn("index", sum("jump").over(w1))
  .withColumn("streak", row_number.over(w2))
  .orderBy("id", "date")
  .show(false)

The jump column is used to calculate the index where the index means that the index of streak. For example, id = 1 and item = A, there should be 2 indices. The index 0 and 1 indicate the first streak from date = 20180101 to 20180102 and the second streak from date = 20180104, respectively. If there is a record with date = 20180105, it will also have the index = 1 and continued to the streak = 2.

The result is:

+---+--------+----+--------+----+-----+------+
|id |date    |item|lag_date|jump|index|streak|
+---+--------+----+--------+----+-----+------+
|1  |20180101|A   |        |0   |0    |1     |
|1  |20180102|A   |20180101|0   |0    |2     |
|1  |20180103|B   |        |0   |0    |1     |
|1  |20180104|A   |20180102|1   |1    |1     |
|2  |20180101|C   |        |0   |0    |1     |
|2  |20180102|D   |        |0   |0    |1     |
|2  |20180103|D   |20180102|0   |0    |2     |
|2  |20180104|D   |20180103|0   |0    |3     |
+---+--------+----+--------+----+-----+------+

where I haven't deleted the temp columns to show how this code works.

Upvotes: 1

Related Questions