Reputation: 23
I have following Spark Dataframe:
id | time |Value|
id1 | 2020-02-22 04:57:36.843 | 1.4 |
id2 | 2020-02-22 04:57:50.850 | 1.7 |
id3 | 2020-02-22 04:58:02.133 | 1.2 |
I want to insert rows between the existing ones with a certain timely distance (e.g. 5s). The output should look like this:
id | time |Value|
id1 | 2020-02-22 04:57:36.843 | 1.4 |
id1 | 2020-02-22 04:57:41.843 | |
id1 | 2020-02-22 04:57:46.843 | |
id1 | 2020-02-22 04:57:51.843 | |
id2 | 2020-02-22 04:57:50.850 | 1.7 |
id2 | 2020-02-22 04:57:55.850 | |
id2 | 2020-02-22 04:58:00.850 | |
id2 | 2020-02-22 04:58:05.850 | |
id3 | 2020-02-22 04:58:02.133 | 1.2 |
id3 | 2020-02-22 04:58:07.133 | |
id3 | 2020-02-22 04:58:12.133 | |
id3 | 2020-02-22 04:58:17.133 | |
I have tried do get this through a for-loop, creating new dataframes (of each new row) and merge it to the existing one by "union", which was not successful. I especially do not get the id by this approach.
Do you have an idea how I reach my desired output?
Upvotes: 1
Views: 1240
Reputation: 13591
Here is my try with some modifications, for example, I can't understand how there exist 62 seconds.
from pyspark.sql.functions import *
from pyspark.sql import Window
w = Window.orderBy('time')
df.select('id', 'time') \
.withColumn('time', to_timestamp('time', 'yyyy-MM-dd HH:mm:ss.SSS')) \
.withColumn('time2', coalesce(lead('time', 1).over(w), expr('time + interval 10 seconds'))) \
.withColumn('seq', expr("sequence(time, time2 + interval 5 seconds, interval 5 seconds)")) \
.withColumn('time', explode('seq')) \
.select('id', 'time') \
.join(df, ['id', 'time'], 'left') \
.fillna(0).show(20, False)
+---+-----------------------+-----+
|id |time |Value|
+---+-----------------------+-----+
|id1|2020-02-22 04:57:36.843|1.4 |
|id1|2020-02-22 04:57:41.843|0.0 |
|id1|2020-02-22 04:57:46.843|0.0 |
|id1|2020-02-22 04:57:51.843|0.0 |
|id2|2020-02-22 04:57:50.85 |1.7 |
|id2|2020-02-22 04:57:55.85 |0.0 |
|id2|2020-02-22 04:58:00.85 |0.0 |
|id3|2020-02-22 04:57:59.133|1.2 |
|id3|2020-02-22 04:58:04.133|0.0 |
|id3|2020-02-22 04:58:09.133|0.0 |
|id3|2020-02-22 04:58:14.133|0.0 |
+---+-----------------------+-----+
Upvotes: 2