Reputation: 369
Short Version
The task is to "go through" a pyspark dataframe, ordered by time. Whenever a keyword appears, we need to add another keyword in the same column a fixed time later.
Example
Consider the following data frame:
columns = ["time","ID","marker"]
data = [
(0.0, "ID1", None),
(0.1, "ID1", "Start"),
(0.2, "ID1", None),
(0.3, "ID1", None),
(0.4, "ID1", None),
(0.5, "ID1", None),
(0.0, "ID2", "Boot"),
(0.5, "ID2", None),
(1.0, "ID2", None),
(1.5, "ID2", "Start"),
(2.0, "ID2", None),
(2.5, "ID2", None),
]
df = spark.createDataFrame(data,columns)
df.show()
+----+---+------+
|time| ID|marker|
+----+---+------+
| 0.0|ID1| null|
| 0.1|ID1| Start|
| 0.2|ID1| null|
| 0.3|ID1| null|
| 0.4|ID1| null|
| 0.5|ID1| null|
| 0.0|ID2| Boot|
| 0.5|ID2| null|
| 1.0|ID2| null|
| 1.5|ID2| Start|
| 2.0|ID2| null|
| 2.5|ID2| null|
+----+---+------+
My task is to add the string End
to the marker
column, at the nearest time after 0.15s have passed since each occurence of the Start
string. For clarification, my desired result is:
+----+---+------+
|time| ID|marker|
+----+---+------+
| 0.0|ID1| null|
| 0.1|ID1| Start| <- Start the clock
| 0.2|ID1| null| <- 0.15s have not elapsed yet
| 0.3|ID1| End| <- 0.15s have elapsed! Write "End"
| 0.4|ID1| null|
| 0.5|ID1| null|
| 0.0|ID2| Boot|
| 0.5|ID2| null|
| 1.0|ID2| null|
| 1.5|ID2| Start| <- Start the clock
| 2.0|ID2| End| <- 0.15s have elapsed! Write "End"
| 2.5|ID2| Null|
+----+---+------+
Research so far
The window function combined with lag seemed most promising, but it allows only lags of rows, not seconds. I tried faking this behavior with a lag, calculating lines_per_second
first:
w = Window.partitionBy(F.col("ID")).orderBy("time").rangeBetween(-sys.maxsize, sys.maxsize)
lines_per_second = (F.lit(-1) + F.count("time").over(w)) / (F.max("time").over(w) - F.min("time").over(w))
+----+---+------+----------------+
|time| ID|marker|lines_per_second|
+----+---+------+----------------+
| 0.0|ID1| null| 10.0|
| 0.1|ID1| Start| 10.0|
| 0.2|ID1| null| 10.0|
| 0.3|ID1| null| 10.0|
| 0.4|ID1| null| 10.0|
| 0.5|ID1| null| 10.0|
| 0.0|ID2| Boot| 2.0|
| 0.5|ID2| null| 2.0|
| 1.0|ID2| null| 2.0|
| 1.5|ID2| Start| 2.0|
| 2.0|ID2| null| 2.0|
| 2.5|ID2| null| 2.0|
+----+---+------+----------------+
But I'm stuck all the same. Any help is greatly appreciated!
Upvotes: 0
Views: 692
Reputation: 42422
See inline comments for explanations:
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'start', # Find the start time for each ID
F.max(
F.when(F.col('marker') == 'Start', F.col('time'))
).over(Window.partitionBy('ID'))
).withColumn(
'end', # Add 0.15s elapsed time
F.col('start') + 0.15
).withColumn(
'marker2', # Find the row which is the earliest after 0.15s has elapsed
F.row_number().over(
Window.partitionBy('ID')
.orderBy(F.when(F.col('end') < F.col('time'), F.col('time')).asc_nulls_last())
) == 1
).withColumn(
'marker', # update marker
F.coalesce(
'marker',
F.when(F.col('marker2'), F.lit('End'))
)
).select(
df.columns # remove temporary columns
).orderBy('ID', 'time')
df2.show()
+----+---+------+
|time| ID|marker|
+----+---+------+
| 0.0|ID1| null|
| 0.1|ID1| Start|
| 0.2|ID1| null|
| 0.3|ID1| End|
| 0.4|ID1| null|
| 0.5|ID1| null|
| 0.0|ID2| Boot|
| 0.5|ID2| null|
| 1.0|ID2| null|
| 1.5|ID2| Start|
| 2.0|ID2| End|
| 2.5|ID2| null|
+----+---+------+
Upvotes: 1