Dcook
Dcook

Reputation: 961

Detect when certain column value changes from value1 to value2 in pyspark

I have a dataframe like below:

|DateTime           |uid      |fid.    |code       |DataLen|result     |

|2020-02-23 11:42:34|38       |0000126D|35         |02     |24         |
|2020-02-24 11:47:34|38       |0000126D|35         |02     |24         |
|2020-02-24 11:48:34|38       |0000126D|35         |02     |23         |
|2020-02-24 11:49:34|38       |0000126D|35         |02     |23         |
|2020-02-24 11:50:34|38       |0000126D|35         |02     |22         |
|2020-02-25 11:52:34|38       |0000126D|35         |02     |22         |
|2020-02-25 11:12:35|38       |0000126D|35         |02     |21         |
|2020-02-26 11:34:35|38       |0000126D|35         |02     |21         |
|2020-02-27 11:12:35|38       |0000126D|35         |02     |2A         |
|2020-02-28 11:43:35|38       |0000126D|35         |02     |2A         |
|2020-03-01 11:23:35|38       |0000126D|35         |02     |24         |
|2020-03-02 11:10:35|38       |0000126D|35         |02     |23         |
|2020-03-03 11:07:35|38       |0000126D|35         |02     |22         |
|2020-03-04 11:31:35|38       |0000126D|35         |02     |21         |
|2020-03-05 11:07:35|38       |0000126D|35         |02     |2A         |
|2020-03-06 11:17:35|38       |0000126D|35         |02     |2A         |
|2020-03-07 11:15:47|38       |0000126D|35         |02     |24         |
|2020-03-08 11:34:09|38       |0000126D|35         |02     |24         |

Output I need:

|DateTime           |uid      |fid.    |code       |DataLen|result     |Bool
|2020-02-23 11:42:34|38       |0000126D|35         |02     |24         |T0
|2020-02-24 11:47:34|38       |0000126D|35         |02     |24         |F
|2020-02-24 11:48:34|38       |0000126D|35         |02     |23         |F
|2020-02-24 11:49:34|38       |0000126D|35         |02     |23         |F
|2020-02-24 11:50:34|38       |0000126D|35         |02     |22         |F
|2020-02-25 11:52:34|38       |0000126D|35         |02     |22         |F
|2020-02-25 11:12:35|38       |0000126D|35         |02     |21         |F
|2020-02-26 11:34:35|38       |0000126D|35         |02     |21         |F
|2020-02-27 11:12:35|38       |0000126D|35         |02     |2A         |F
|2020-02-28 11:43:35|38       |0000126D|35         |02     |2A         |T1
|2020-03-01 11:23:35|38       |0000126D|35         |02     |24         |T0
|2020-03-02 11:10:35|38       |0000126D|35         |02     |23         |F
|2020-03-03 11:07:35|38       |0000126D|35         |02     |22         |F
|2020-03-04 11:31:35|38       |0000126D|35         |02     |21         |F
|2020-03-05 11:07:35|38       |0000126D|35         |02     |2A         |F
|2020-03-06 11:17:35|38       |0000126D|35         |02     |2A         |T1
|2020-03-07 11:15:47|38       |0000126D|35         |02     |24         |T0
|2020-03-08 11:34:09|38       |0000126D|35         |02     |24         |F

I want get the 'DateTime' value when values of result changes from 2A to 24. So basically 24 to 2A is a cycle for each sensor. I want get the datetime value for first record of "24"(T0) and when the value changes like above.

How can I do that in most efficient way considering I have to do this multiple times for each sensor id and the data set is having 1000k records

Upvotes: 5

Views: 1634

Answers (1)

notNull
notNull

Reputation: 31470

Use .window lag,lead,row_number functions for this case.

Example:

from pyspark.sql.functions import *
from pyspark.sql import Window
w = Window.orderBy("datetime")

#if rn 1 and result=24 then add T0, if result 2A next value =24 then T1 else F
#if lag_bool T1 and next value is F then T0

df.withColumn("rn",row_number().over(w)).\
withColumn("lead",lead(col("result"),1).over(w)).\
withColumn("bool_tmp",when((col("result") == "24") &(col("rn") == 1) , lit("T0")).when((col("lead") == "24") & (col("result") =="2A"), lit("T1")).otherwise(lit("F"))).\
withColumn("lag_bool",lag(col("bool_tmp"),1).over(w)).withColumn("bool",when((col("bool_tmp") == "F") & (col("lag_bool") =="T1"), lit("T0")).otherwise(col("bool_tmp"))).\
drop("rn","bool_tmp","lag_bool","lead").\
show()

#+-------------------+---+--------+----+-------+------+----+
#|           datetime|uid|sensorid|code|datalen|result|bool|
#+-------------------+---+--------+----+-------+------+----+
#|2020-02-23 11:42:34| 38|0000126D|  35|     02|    24|  T0|
#|2020-02-24 11:47:34| 38|0000126D|  35|     02|    24|   F|
#|2020-02-24 11:48:34| 38|0000126D|  35|     02|    23|   F|
#|2020-02-24 11:49:34| 38|0000126D|  35|     02|    23|   F|
#|2020-02-24 11:50:34| 38|0000126D|  35|     02|    22|   F|
#|2020-02-25 11:12:35| 38|0000126D|  35|     02|    21|   F|
#|2020-02-25 11:52:34| 38|0000126D|  35|     02|    22|   F|
#|2020-02-26 11:34:35| 38|0000126D|  35|     02|    21|   F|
#|2020-02-27 11:12:35| 38|0000126D|  35|     02|    2A|   F|
#|2020-02-28 11:43:35| 38|0000126D|  35|     02|    2A|  T1|
#|2020-03-01 11:23:35| 38|0000126D|  35|     02|    24|  T0|
#|2020-03-02 11:10:35| 38|0000126D|  35|     02|    23|   F|
#|2020-03-03 11:07:35| 38|0000126D|  35|     02|    22|   F|
#|2020-03-04 11:31:35| 38|0000126D|  35|     02|    21|   F|
#|2020-03-05 11:07:35| 38|0000126D|  35|     02|    2A|   F|
#|2020-03-06 11:17:35| 38|0000126D|  35|     02|    2A|  T1|
#|2020-03-07 11:15:47| 38|0000126D|  35|     02|    24|  T0|
#|2020-03-08 11:34:09| 38|0000126D|  35|     02|    24|   F|
#+-------------------+---+--------+----+-------+------+----+

Upvotes: 2

Related Questions