Stergios
Stergios

Reputation: 3196

PySpark window with condition

I have a dataset with application logs that show when a certain app was launched or closed. Sometimes, the related events may be missing entirely from the logs. I want to match each app start with the related end event (if it exists).

Here's an illustrative dataset:

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([['Group1', 'Logon', 'Name1', '2021-02-05T19:03:00.000+0000'],
                            ['Group1', 'Start', 'Name1', '2021-02-05T19:04:00.000+0000'],
                            ['Group1', 'Start', 'Name1', '2021-02-05T19:05:00.000+0000'],
                            ['Group1', 'End', 'Name1', '2021-02-05T19:06:00.000+0000'],
                            ['Group1', 'End', 'Name3', '2021-02-05T19:06:01.000+0000'],
                            ['Group1', 'End', 'Name1', '2021-02-05T19:07:00.000+0000'],
                            ['Group2', 'Start', 'Name1', '2021-02-05T19:04:00.000+0000'],
                            ['Group2', 'Start', 'Name1', '2021-02-05T19:05:00.000+0000'],
                            ['Group2', 'Start', 'Name2', '2021-02-05T19:06:00.000+0000'],
                            ['Group2', 'End', 'Name1', '2021-02-05T19:07:00.000+0000'],
                            ['Group2', 'Close', 'Name1', '2021-02-05T19:07:00.000+0000'],
                            ], ['group', 'type', 'name', 'time'])

df = df.withColumn('time', F.col('time').cast('timestamp'))

For each group separately, I want to put a common identifier to each 'Start' and 'End' event if they have the same 'name'. In other words, for each 'Start' event I want to find the first 'End' event that has not already been matched to another 'Start' event.

The expected result could be something like the following picture:

enter image description here

I don't mind if the identifier (i.e. 'my_group') is an ID, a timestamp or if it is monotonically increasing across groups. I just want to be able to match the relevant events within each group.

What I've tried

I thought about using window functions in order to identify the end time of 'Start' events and the start time of 'End' events. However, I cannot restrict to searching only for 'End' events (and 'Start' events respectively). Also, I cannot apply the logic described above of finding the first 'End' event that has not already been matched to another 'Start' event.

Here's my code:

app_session_window_down = Window.partitionBy('group', "name").orderBy(F.col("time").cast('long')).rangeBetween(1, Window.unboundedFollowing) #search in the future
app_session_window_up = Window.partitionBy('group', "name").orderBy(F.col("time").cast('long')).rangeBetween(Window.unboundedPreceding, -1) #search in the past

df = df.withColumn("app_time_end", F.when((F.col("type") == 'Start'), F.first(F.col('time'), ignorenulls=True).over(app_session_window_down)).otherwise(F.lit('None')))\
    .withColumn("app_time_start", F.when((F.col("type") == 'End'), F.last(F.col('time'), ignorenulls=True).over(app_session_window_up)).otherwise(F.col('app_time_end')))

which gives:

enter image description here

This is nowhere close to what I want to achieve. Any hints?

Upvotes: 1

Views: 1485

Answers (1)

mck
mck

Reputation: 42332

Explanations are in the inline comments:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'my_group',    # the column you wanted
    F.when(
        F.col('type').isin(['Start', 'End']),
        F.row_number().over(Window.partitionBy('group', 'name', 'type').orderBy('time'))
    )
).withColumn(
    'max_group',    # helper column: get maximum row_number for each group ; will be used later
    F.least(
        F.max(
            F.when(
                F.col('type') == 'Start', F.col('my_group')
            ).otherwise(0)
        ).over(Window.partitionBy('group', 'name')),
        F.max(
            F.when(
                F.col('type') == 'End', F.col('my_group')
            ).otherwise(0)
        ).over(Window.partitionBy('group', 'name'))
    )
).withColumn(
    'my_group',    # mask the rows which don't have corresponding 'start'/'end'
    F.when(
        F.col('my_group') <= F.col('max_group'),
        F.col('my_group')
    )
).withColumn(
    'my_group',    # add the group name
    F.when(F.col('my_group').isNotNull(), F.concat_ws('_', 'group', 'name', 'my_group'))
).drop('max_group').orderBy('group', 'time')
df2.show()
+------+-----+-----+-------------------+--------------+
| group| type| name|               time|      my_group|
+------+-----+-----+-------------------+--------------+
|Group1|Logon|Name1|2021-02-05 19:03:00|          null|
|Group1|Start|Name1|2021-02-05 19:04:00|Group1_Name1_1|
|Group1|Start|Name1|2021-02-05 19:05:00|Group1_Name1_2|
|Group1|  End|Name1|2021-02-05 19:06:00|Group1_Name1_1|
|Group1|  End|Name3|2021-02-05 19:06:01|          null|
|Group1|  End|Name1|2021-02-05 19:07:00|Group1_Name1_2|
|Group2|Start|Name1|2021-02-05 19:04:00|Group2_Name1_1|
|Group2|Start|Name1|2021-02-05 19:05:00|          null|
|Group2|Start|Name2|2021-02-05 19:06:00|          null|
|Group2|  End|Name1|2021-02-05 19:07:00|Group2_Name1_1|
|Group2|Close|Name1|2021-02-05 19:07:00|          null|
+------+-----+-----+-------------------+--------------+

Upvotes: 3

Related Questions