Milo Ventimiglia
Milo Ventimiglia

Reputation: 91

Pyspark - Manipulate dataframe to get time change in status

I have a dataframe in Pyspark describing the state of a service as follows. The frequency rate at which I receive an update of the service status is not constant.

status timestamp_of_update
OK 2020-01-01 14:30:00
OK 2020-01-01 14:15:00
Broken 2020-01-01 14:10:00
Broken 2020-01-01 14:00:00
Broken 2020-01-01 13:40:00
Broken 2020-01-01 13:35:00
OK 2020-01-01 13:15:00
OK 2020-01-01 13:00:00
OK 2020-01-01 12:40:00

Based on this, I would like to create a column that gives me the time since the last update that satisfies the following conditions:

  1. If the status is "OK" and the service has been running without issues, then the time diff since the last update.

  2. When the status is reported as "broken" I want the time since the service went down.

  3. When the status gets back to OK again, then the difference should be zero.

So, the final dataset should look like something like this.

status timestamp_of_update time_gone_by
OK 2020-01-01 14:30:00 15mins
OK 2020-01-01 14:15:00 0mins
Broken 2020-01-01 14:10:00 55mins
Broken 2020-01-01 14:00:00 45mins
Broken 2020-01-01 13:40:00 25mins
Broken 2020-01-01 13:35:00 20mins
OK 2020-01-01 13:15:00 15mins
OK 2020-01-01 13:00:00 20mins
OK 2020-01-01 12:40:00 NaN

Anyone has an idea how to do this in PySpark? Thanks!

Upvotes: 0

Views: 469

Answers (1)

mck
mck

Reputation: 42342

You can create some helper columns to check the conditions required:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'first_row',
    F.row_number().over(Window.orderBy('timestamp_of_update')) == 1
).withColumn(
    'change_to_ok', 
    (F.lag('status').over(Window.orderBy('timestamp_of_update')) != 'OK') & 
    (F.col('status') == 'OK') 
).withColumn(
    'last_ok', 
    F.last(
        F.when(F.col('status') == 'OK', F.col('timestamp_of_update')), 
        True
    ).over(Window.orderBy('timestamp_of_update'))
).withColumn(
    'time',
    F.when(
        F.col('status') == 'Broken',
        F.col('last_ok')
    ).when(
        F.col('change_to_ok'),
        F.col('timestamp_of_update')
    ).when(
        F.col('status') == 'OK',
        F.lag('timestamp_of_update').over(Window.orderBy('timestamp_of_update'))
    )
).withColumn(
    'time_gone_by', 
    (F.unix_timestamp('timestamp_of_update') - F.unix_timestamp('time'))/60
).select('status', 'timestamp_of_update', 'time_gone_by')

Result:

df2.show()
+------+-------------------+------------+
|status|timestamp_of_update|time_gone_by|
+------+-------------------+------------+
|    OK|2020-01-01 12:40:00|        null|
|    OK|2020-01-01 13:00:00|        20.0|
|    OK|2020-01-01 13:15:00|        15.0|
|Broken|2020-01-01 13:35:00|        20.0|
|Broken|2020-01-01 13:40:00|        25.0|
|Broken|2020-01-01 14:00:00|        45.0|
|Broken|2020-01-01 14:10:00|        55.0|
|    OK|2020-01-01 14:15:00|         0.0|
|    OK|2020-01-01 14:30:00|        15.0|
+------+-------------------+------------+

Behind the scenes:

+------+-------------------+---------+------------+-------------------+-------------------+------------+
|status|timestamp_of_update|first_row|change_to_ok|            last_ok|               time|time_gone_by|
+------+-------------------+---------+------------+-------------------+-------------------+------------+
|    OK|2020-01-01 12:40:00|     true|        null|2020-01-01 12:40:00|               null|        null|
|    OK|2020-01-01 13:00:00|    false|       false|2020-01-01 13:00:00|2020-01-01 12:40:00|        20.0|
|    OK|2020-01-01 13:15:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:00:00|        15.0|
|Broken|2020-01-01 13:35:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:15:00|        20.0|
|Broken|2020-01-01 13:40:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:15:00|        25.0|
|Broken|2020-01-01 14:00:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:15:00|        45.0|
|Broken|2020-01-01 14:10:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:15:00|        55.0|
|    OK|2020-01-01 14:15:00|    false|        true|2020-01-01 14:15:00|2020-01-01 14:15:00|         0.0|
|    OK|2020-01-01 14:30:00|    false|       false|2020-01-01 14:30:00|2020-01-01 14:15:00|        15.0|
+------+-------------------+---------+------------+-------------------+-------------------+------------+

Upvotes: 1

Related Questions