Reputation: 91
I have a dataframe in Pyspark describing the state of a service as follows. The frequency rate at which I receive an update of the service status is not constant.
status | timestamp_of_update |
---|---|
OK | 2020-01-01 14:30:00 |
OK | 2020-01-01 14:15:00 |
Broken | 2020-01-01 14:10:00 |
Broken | 2020-01-01 14:00:00 |
Broken | 2020-01-01 13:40:00 |
Broken | 2020-01-01 13:35:00 |
OK | 2020-01-01 13:15:00 |
OK | 2020-01-01 13:00:00 |
OK | 2020-01-01 12:40:00 |
Based on this, I would like to create a column that gives me the time since the last update that satisfies the following conditions:
If the status is "OK" and the service has been running without issues, then the time diff since the last update.
When the status is reported as "broken" I want the time since the service went down.
When the status gets back to OK again, then the difference should be zero.
So, the final dataset should look like something like this.
status | timestamp_of_update | time_gone_by |
---|---|---|
OK | 2020-01-01 14:30:00 | 15mins |
OK | 2020-01-01 14:15:00 | 0mins |
Broken | 2020-01-01 14:10:00 | 55mins |
Broken | 2020-01-01 14:00:00 | 45mins |
Broken | 2020-01-01 13:40:00 | 25mins |
Broken | 2020-01-01 13:35:00 | 20mins |
OK | 2020-01-01 13:15:00 | 15mins |
OK | 2020-01-01 13:00:00 | 20mins |
OK | 2020-01-01 12:40:00 | NaN |
Anyone has an idea how to do this in PySpark? Thanks!
Upvotes: 0
Views: 469
Reputation: 42342
You can create some helper columns to check the conditions required:
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'first_row',
F.row_number().over(Window.orderBy('timestamp_of_update')) == 1
).withColumn(
'change_to_ok',
(F.lag('status').over(Window.orderBy('timestamp_of_update')) != 'OK') &
(F.col('status') == 'OK')
).withColumn(
'last_ok',
F.last(
F.when(F.col('status') == 'OK', F.col('timestamp_of_update')),
True
).over(Window.orderBy('timestamp_of_update'))
).withColumn(
'time',
F.when(
F.col('status') == 'Broken',
F.col('last_ok')
).when(
F.col('change_to_ok'),
F.col('timestamp_of_update')
).when(
F.col('status') == 'OK',
F.lag('timestamp_of_update').over(Window.orderBy('timestamp_of_update'))
)
).withColumn(
'time_gone_by',
(F.unix_timestamp('timestamp_of_update') - F.unix_timestamp('time'))/60
).select('status', 'timestamp_of_update', 'time_gone_by')
Result:
df2.show()
+------+-------------------+------------+
|status|timestamp_of_update|time_gone_by|
+------+-------------------+------------+
| OK|2020-01-01 12:40:00| null|
| OK|2020-01-01 13:00:00| 20.0|
| OK|2020-01-01 13:15:00| 15.0|
|Broken|2020-01-01 13:35:00| 20.0|
|Broken|2020-01-01 13:40:00| 25.0|
|Broken|2020-01-01 14:00:00| 45.0|
|Broken|2020-01-01 14:10:00| 55.0|
| OK|2020-01-01 14:15:00| 0.0|
| OK|2020-01-01 14:30:00| 15.0|
+------+-------------------+------------+
Behind the scenes:
+------+-------------------+---------+------------+-------------------+-------------------+------------+
|status|timestamp_of_update|first_row|change_to_ok| last_ok| time|time_gone_by|
+------+-------------------+---------+------------+-------------------+-------------------+------------+
| OK|2020-01-01 12:40:00| true| null|2020-01-01 12:40:00| null| null|
| OK|2020-01-01 13:00:00| false| false|2020-01-01 13:00:00|2020-01-01 12:40:00| 20.0|
| OK|2020-01-01 13:15:00| false| false|2020-01-01 13:15:00|2020-01-01 13:00:00| 15.0|
|Broken|2020-01-01 13:35:00| false| false|2020-01-01 13:15:00|2020-01-01 13:15:00| 20.0|
|Broken|2020-01-01 13:40:00| false| false|2020-01-01 13:15:00|2020-01-01 13:15:00| 25.0|
|Broken|2020-01-01 14:00:00| false| false|2020-01-01 13:15:00|2020-01-01 13:15:00| 45.0|
|Broken|2020-01-01 14:10:00| false| false|2020-01-01 13:15:00|2020-01-01 13:15:00| 55.0|
| OK|2020-01-01 14:15:00| false| true|2020-01-01 14:15:00|2020-01-01 14:15:00| 0.0|
| OK|2020-01-01 14:30:00| false| false|2020-01-01 14:30:00|2020-01-01 14:15:00| 15.0|
+------+-------------------+---------+------------+-------------------+-------------------+------------+
Upvotes: 1