paveltr
paveltr

Reputation: 474

How to split data into groups in pyspark

I need to find groups in time series data.

Data sample

enter image description here

I need to output column group based on value and day.

I've tried using lag, lead and row_number but it ended up to nothing.

Upvotes: 3

Views: 1144

Answers (2)

murtihash
murtihash

Reputation: 8410

PySpark way to do this. Find endpoints of groups using lag, do an incremental sum on this lag to get groups, add 1 to groups to get your desired groups.

from pypsark.sql.window import Window
from pyspark.sql import functions as F

w1=Window().orderBy("day")
df.withColumn("lag", F.when(F.lag("value").over(w1)!=F.col("value"), F.lit(1)).otherwise(F.lit(0)))\
  .withColumn("group", F.sum("lag").over(w1) + 1).drop("lag").show()

#+-----+---+-----+
#|value|day|group|
#+-----+---+-----+
#|    1|  1|    1|
#|    1|  2|    1|
#|    1|  3|    1|
#|    1|  4|    1|
#|    1|  5|    1|
#|    2|  6|    2|
#|    2|  7|    2|
#|    1|  8|    3|
#|    1|  9|    3|
#|    1| 10|    3|
#|    1| 11|    3|
#|    1| 12|    3|
#|    1| 13|    3|
#+-----+---+-----+

Upvotes: 3

GMB
GMB

Reputation: 222662

It seems like you want to increment the group everytime the value changes. If so, this is a kind of gaps-and-islands problem.

Here is one approach that uses lag() and a cumulative sum():

select
    value,
    day,
    sum(case when value = lag_value then 0 else 1 end) over(order by day) grp
from (
    select t.*, lag(value) over(order by day) lag_value
    from mytable t
) t

Upvotes: 4

Related Questions