miket
miket

Reputation: 23

Pyspark: Insert rows with specific timestamp into dataframe

I have following Spark Dataframe:

   id  |  time                   |Value|    
   id1 | 2020-02-22 04:57:36.843 | 1.4 |
   id2 | 2020-02-22 04:57:50.850 | 1.7 |
   id3 | 2020-02-22 04:58:02.133 | 1.2 |

I want to insert rows between the existing ones with a certain timely distance (e.g. 5s). The output should look like this:

   id  |  time                   |Value|    
   id1 | 2020-02-22 04:57:36.843 | 1.4 |
   id1 | 2020-02-22 04:57:41.843 |     |
   id1 | 2020-02-22 04:57:46.843 |     |
   id1 | 2020-02-22 04:57:51.843 |     |
   id2 | 2020-02-22 04:57:50.850 | 1.7 |
   id2 | 2020-02-22 04:57:55.850 |     |
   id2 | 2020-02-22 04:58:00.850 |     |
   id2 | 2020-02-22 04:58:05.850 |     |
   id3 | 2020-02-22 04:58:02.133 | 1.2 |
   id3 | 2020-02-22 04:58:07.133 |     |
   id3 | 2020-02-22 04:58:12.133 |     |
   id3 | 2020-02-22 04:58:17.133 |     |

I have tried do get this through a for-loop, creating new dataframes (of each new row) and merge it to the existing one by "union", which was not successful. I especially do not get the id by this approach.

Do you have an idea how I reach my desired output?

Upvotes: 1

Views: 1240

Answers (1)

Lamanus
Lamanus

Reputation: 13591

Here is my try with some modifications, for example, I can't understand how there exist 62 seconds.

from pyspark.sql.functions import *
from pyspark.sql import Window

w = Window.orderBy('time')

df.select('id', 'time') \
  .withColumn('time', to_timestamp('time', 'yyyy-MM-dd HH:mm:ss.SSS')) \
  .withColumn('time2', coalesce(lead('time', 1).over(w), expr('time + interval 10 seconds'))) \
  .withColumn('seq', expr("sequence(time, time2 + interval 5 seconds, interval 5 seconds)")) \
  .withColumn('time', explode('seq')) \
  .select('id', 'time') \
  .join(df, ['id', 'time'], 'left') \
  .fillna(0).show(20, False)

+---+-----------------------+-----+
|id |time                   |Value|
+---+-----------------------+-----+
|id1|2020-02-22 04:57:36.843|1.4  |
|id1|2020-02-22 04:57:41.843|0.0  |
|id1|2020-02-22 04:57:46.843|0.0  |
|id1|2020-02-22 04:57:51.843|0.0  |
|id2|2020-02-22 04:57:50.85 |1.7  |
|id2|2020-02-22 04:57:55.85 |0.0  |
|id2|2020-02-22 04:58:00.85 |0.0  |
|id3|2020-02-22 04:57:59.133|1.2  |
|id3|2020-02-22 04:58:04.133|0.0  |
|id3|2020-02-22 04:58:09.133|0.0  |
|id3|2020-02-22 04:58:14.133|0.0  |
+---+-----------------------+-----+

Upvotes: 2

Related Questions