nehacharya
nehacharya

Reputation: 957

PySpark: Timeslice and split rows in dataframe with 5 minutes interval on a specific condition

I have a dataframe with the following columns:

+-----+----------+--------------------------+-----------+
|id   | sourceid |        timestamp         | indicator |
+-----+----------+--------------------------+-----------+
| 0   |  128     |  2019-12-03 12:00:00.0   | 0         |
| 1   |  128     |  2019-12-03 12:30:00.0   | 1         |
| 2   |  128     |  2019-12-03 12:37:00.0   | 0         |
| 3   |  128     |  2019-12-03 13:15:00.0   | 1         |
| 4   |  128     |  2019-12-03 13:17:00.0   | 0         | 
+-----+----------+--------------------------+-----------+

I am trying to split the timestamp column into rows of 5 minute time intervals for indicator values which are not 0.

Explanation:

How can I achieve this with PySpark?

Expected Output:

+-----+----------+--------------------------+-------------+
|id   | sourceid |        timestamp         | indicator   |
+-----+----------+--------------------------+-------------+
| 1   | 128      |  2019-12-03 12:30:00.0   | 1           |
| 1   | 128      |  2019-12-03 12:35:00.0   | 1           |
| 4   | 128      |  2019-12-03 13:15:00.0   | 1           |
| 4   | 128      |  2019-12-03 13:20:00.0   | 1           |
+-----+----------+--------------------------+-------------+

Upvotes: 0

Views: 2018

Answers (1)

jxc
jxc

Reputation: 13998

IIUC, you can filter rows based on indicators on the current and the next rows, and then use array + explode to create new rows (for testing purpose, I added some more rows into your original example):

from pyspark.sql import Window, functions as F

w1 = Window.partitionBy('sourceid').orderBy('timestamp')

# add a flag to check if the next indicator is '0'       
df1 = df.withColumn('next_indicator_is_0', F.lead('indicator').over(w1) == 0) 
df1.show(truncate=False)
+---+--------+---------------------+---------+-------------------+
|id |sourceid|timestamp            |indicator|next_indicator_is_0|
+---+--------+---------------------+---------+-------------------+
|0  |128     |2019-12-03 12:00:00.0|0        |false              |
|1  |128     |2019-12-03 12:30:00.0|1        |true               |
|2  |128     |2019-12-03 12:37:00.0|0        |false              |
|3  |128     |2019-12-03 13:12:00.0|1        |false              |
|4  |128     |2019-12-03 13:15:00.0|1        |true               |
|5  |128     |2019-12-03 13:17:00.0|0        |false              |
|6  |128     |2019-12-03 13:20:00.0|1        |null               |
+---+--------+---------------------+---------+-------------------+

df1.filter("indicator = 1 AND next_indicator_is_0") \
   .withColumn('timestamp', F.expr("explode(array(`timestamp`, `timestamp` + interval 5 minutes))")) \
   .drop('next_indicator_is_0') \
   .show(truncate=False)
+---+--------+---------------------+---------+
|id |sourceid|timestamp            |indicator|
+---+--------+---------------------+---------+
|1  |128     |2019-12-03 12:30:00.0|1        |
|1  |128     |2019-12-03 12:35:00  |1        |
|4  |128     |2019-12-03 13:15:00.0|1        |
|4  |128     |2019-12-03 13:20:00  |1        |
+---+--------+---------------------+---------+

Note: you can reset id column by using F.row_number().over(w1) or F.monotonically_increasing_id() based on your requirements.

Upvotes: 3

Related Questions