aruydzi
aruydzi

Reputation: 97

Assign ID based on another column value

Is there a way to assign an operation id based on the value of the state column? The aim is to assign incrementing ID for each start-on-end sequence. For example: in the below table, the initial operation which started on 2020-09-15 22:49 gets an operation id = 1 and all the rows until the end of the operation will also take an id 1. Each start/end states and all "on" states in between the start and the end will have the same id.

Timestamp         |state | operation id
----------------------------------------
2020-09-15 22:53    start   1
2020-09-16 22:53    on      1
2020-09-17 22:53    on      1
2020-09-18 22:53    on      1
2020-09-19 22:53    end     1
2020-09-20 22:53    off     null
2020-09-21 22:53    off     null
2020-09-22 22:53    off     null
2020-09-23 22:53    start   2
2020-09-24 22:53    on      2
2020-09-25 22:53    end     2
2020-09-26 22:53    start   3
2020-09-27 22:53    end     3

Timestamp and state columns are available. The aim is to build the operation id column.

Upvotes: 1

Views: 328

Answers (1)

Cena
Cena

Reputation: 3419

You can use a Window function ordered by 'Timestamp'. Since you want the operation_id to be always null when the 'state' is 'off', I would filter the state 'off' rows and have it as a separate dataframe. We'll assign 'start' as 1, 'on' as 0, and 'end' as 2

First, get an incremental sum on this new number-assigned 'state' column over your window. The incremental sum corresponding to 'end' state will always be a multiple of 3. That will also be your 'end of sequence'

To get what you need, you will have to use a lag function on the incremental sum column and then replace the multiples of 3s with the lag values. The final step would be to divide by 3, cast it to an integer and add 1.

Now union df_not_off and df_off for the final output

Your dataframe:

from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *

schema = StructType([StructField("Timestamp", IntegerType()), StructField("state", StringType())])

data = [[1, 'start'], [2, 'on'], [3, 'on'], [4, 'on'], [5, 'end'], [6, 'off'], [7, 'off'], \
        [8, 'off'], [9, 'start'], [10, 'on'], [11, 'end'], [12, 'start'], [13, 'end']]

df = spark.createDataFrame(data,schema=schema)

df.show()

Operations:

df_off = df.filter(col("state")=="off")
df_not_off = df.filter(col("state")!="off")
df_not_off = df_not_off.withColumn("state_num", F.when(col("state")=="start", 1).when(col("state")=="on", 0).otherwise(2))

w=Window().orderBy("Timestamp")

df_not_off = df_not_off.withColumn("incremental_sum", F.sum("state_num").over(w))\
  .withColumn("lag", F.lag("incremental_sum").over(w))\
  .withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
  .withColumn("incremental_sum", ((F.col("incremental_sum")/3).cast('integer'))+1)\
  .withColumnRenamed("incremental_sum", "operation_id")\
  .drop("state_num", "lag")

df_off = df_off.withColumn("operation_id", F.lit(None))

final_df = df_not_off.union(df_off)

final_df.orderBy("Timestamp").show()

Output:

+---------+-----+------------+                                                  
|Timestamp|state|operation_id|
+---------+-----+------------+
|        1|start|           1|
|        2|   on|           1|
|        3|   on|           1|
|        4|   on|           1|
|        5|  end|           1|
|        6|  off|        null|
|        7|  off|        null|
|        8|  off|        null|
|        9|start|           2|
|       10|   on|           2|
|       11|  end|           2|
|       12|start|           3|
|       13|  end|           3|
+---------+-----+------------+

Upvotes: 2

Related Questions