bolla
bolla

Reputation: 369

Change pyspark dataframe column after fixed offset from a given line

Short Version

The task is to "go through" a pyspark dataframe, ordered by time. Whenever a keyword appears, we need to add another keyword in the same column a fixed time later.

Example

Consider the following data frame:

columns = ["time","ID","marker"]
data = [
    (0.0, "ID1", None), 
    (0.1, "ID1", "Start"), 
    (0.2, "ID1", None), 
    (0.3, "ID1", None), 
    (0.4, "ID1", None), 
    (0.5, "ID1", None), 
    
    (0.0, "ID2", "Boot"), 
    (0.5, "ID2", None), 
    (1.0, "ID2", None), 
    (1.5, "ID2", "Start"), 
    (2.0, "ID2", None), 
    (2.5, "ID2", None), 
    ]
df = spark.createDataFrame(data,columns)
df.show()

+----+---+------+
|time| ID|marker|
+----+---+------+
| 0.0|ID1|  null|
| 0.1|ID1| Start|
| 0.2|ID1|  null|
| 0.3|ID1|  null|
| 0.4|ID1|  null|
| 0.5|ID1|  null|
| 0.0|ID2|  Boot|
| 0.5|ID2|  null|
| 1.0|ID2|  null|
| 1.5|ID2| Start|
| 2.0|ID2|  null|
| 2.5|ID2|  null|
+----+---+------+

My task is to add the string End to the marker column, at the nearest time after 0.15s have passed since each occurence of the Start string. For clarification, my desired result is:

+----+---+------+
|time| ID|marker|
+----+---+------+
| 0.0|ID1|  null|
| 0.1|ID1| Start| <- Start the clock
| 0.2|ID1|  null| <- 0.15s have not elapsed yet
| 0.3|ID1|   End| <- 0.15s have elapsed! Write "End"
| 0.4|ID1|  null|
| 0.5|ID1|  null|
| 0.0|ID2|  Boot|
| 0.5|ID2|  null|
| 1.0|ID2|  null|
| 1.5|ID2| Start| <- Start the clock
| 2.0|ID2|   End| <- 0.15s have elapsed! Write "End"
| 2.5|ID2|  Null|
+----+---+------+

Research so far

The window function combined with lag seemed most promising, but it allows only lags of rows, not seconds. I tried faking this behavior with a lag, calculating lines_per_second first:

w = Window.partitionBy(F.col("ID")).orderBy("time").rangeBetween(-sys.maxsize, sys.maxsize)
lines_per_second = (F.lit(-1) + F.count("time").over(w)) / (F.max("time").over(w) - F.min("time").over(w))

+----+---+------+----------------+
|time| ID|marker|lines_per_second|
+----+---+------+----------------+
| 0.0|ID1|  null|            10.0|
| 0.1|ID1| Start|            10.0|
| 0.2|ID1|  null|            10.0|
| 0.3|ID1|  null|            10.0|
| 0.4|ID1|  null|            10.0|
| 0.5|ID1|  null|            10.0|
| 0.0|ID2|  Boot|             2.0|
| 0.5|ID2|  null|             2.0|
| 1.0|ID2|  null|             2.0|
| 1.5|ID2| Start|             2.0|
| 2.0|ID2|  null|             2.0|
| 2.5|ID2|  null|             2.0|
+----+---+------+----------------+

But I'm stuck all the same. Any help is greatly appreciated!

Upvotes: 0

Views: 692

Answers (1)

mck
mck

Reputation: 42422

See inline comments for explanations:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'start',    # Find the start time for each ID
    F.max(
        F.when(F.col('marker') == 'Start', F.col('time'))
    ).over(Window.partitionBy('ID'))
).withColumn(
    'end',     # Add 0.15s elapsed time
    F.col('start') + 0.15
).withColumn(
    'marker2',     # Find the row which is the earliest after 0.15s has elapsed
    F.row_number().over(
        Window.partitionBy('ID')
              .orderBy(F.when(F.col('end') < F.col('time'), F.col('time')).asc_nulls_last())
    ) == 1
).withColumn(
    'marker',    # update marker
    F.coalesce(
        'marker', 
        F.when(F.col('marker2'), F.lit('End'))
    )
).select(
    df.columns    # remove temporary columns
).orderBy('ID', 'time')

df2.show()
+----+---+------+
|time| ID|marker|
+----+---+------+
| 0.0|ID1|  null|
| 0.1|ID1| Start|
| 0.2|ID1|  null|
| 0.3|ID1|   End|
| 0.4|ID1|  null|
| 0.5|ID1|  null|
| 0.0|ID2|  Boot|
| 0.5|ID2|  null|
| 1.0|ID2|  null|
| 1.5|ID2| Start|
| 2.0|ID2|   End|
| 2.5|ID2|  null|
+----+---+------+

Upvotes: 1

Related Questions