Reputation: 97
Is there a way to assign an operation id based on the value of the state column? The aim is to assign incrementing ID for each start-on-end sequence. For example: in the below table, the initial operation which started on 2020-09-15 22:49 gets an operation id = 1 and all the rows until the end of the operation will also take an id 1. Each start/end states and all "on" states in between the start and the end will have the same id.
Timestamp |state | operation id
----------------------------------------
2020-09-15 22:53 start 1
2020-09-16 22:53 on 1
2020-09-17 22:53 on 1
2020-09-18 22:53 on 1
2020-09-19 22:53 end 1
2020-09-20 22:53 off null
2020-09-21 22:53 off null
2020-09-22 22:53 off null
2020-09-23 22:53 start 2
2020-09-24 22:53 on 2
2020-09-25 22:53 end 2
2020-09-26 22:53 start 3
2020-09-27 22:53 end 3
Timestamp and state columns are available. The aim is to build the operation id column.
Upvotes: 1
Views: 328
Reputation: 3419
You can use a Window
function ordered by 'Timestamp'. Since you want the operation_id
to be always null
when the 'state' is 'off', I would filter the state 'off' rows and have it as a separate dataframe. We'll assign 'start' as 1
, 'on' as 0
, and 'end' as 2
First, get an incremental sum
on this new number-assigned 'state' column over your window. The incremental sum
corresponding to 'end' state will always be a multiple of 3. That will also be your 'end of sequence'
To get what you need, you will have to use a lag
function on the incremental sum
column and then replace the multiples of 3s with the lag values. The final step would be to divide by 3, cast it to an integer and add 1.
Now union df_not_off
and df_off
for the final output
Your dataframe:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
schema = StructType([StructField("Timestamp", IntegerType()), StructField("state", StringType())])
data = [[1, 'start'], [2, 'on'], [3, 'on'], [4, 'on'], [5, 'end'], [6, 'off'], [7, 'off'], \
[8, 'off'], [9, 'start'], [10, 'on'], [11, 'end'], [12, 'start'], [13, 'end']]
df = spark.createDataFrame(data,schema=schema)
df.show()
Operations:
df_off = df.filter(col("state")=="off")
df_not_off = df.filter(col("state")!="off")
df_not_off = df_not_off.withColumn("state_num", F.when(col("state")=="start", 1).when(col("state")=="on", 0).otherwise(2))
w=Window().orderBy("Timestamp")
df_not_off = df_not_off.withColumn("incremental_sum", F.sum("state_num").over(w))\
.withColumn("lag", F.lag("incremental_sum").over(w))\
.withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
.withColumn("incremental_sum", ((F.col("incremental_sum")/3).cast('integer'))+1)\
.withColumnRenamed("incremental_sum", "operation_id")\
.drop("state_num", "lag")
df_off = df_off.withColumn("operation_id", F.lit(None))
final_df = df_not_off.union(df_off)
final_df.orderBy("Timestamp").show()
Output:
+---------+-----+------------+
|Timestamp|state|operation_id|
+---------+-----+------------+
| 1|start| 1|
| 2| on| 1|
| 3| on| 1|
| 4| on| 1|
| 5| end| 1|
| 6| off| null|
| 7| off| null|
| 8| off| null|
| 9|start| 2|
| 10| on| 2|
| 11| end| 2|
| 12|start| 3|
| 13| end| 3|
+---------+-----+------------+
Upvotes: 2