LEJ
LEJ

Reputation: 1958

Pyspark - Using two time indices for window function

I have a dataframe where each row has two date columns. I would like to create a window function with a range between that counts the number of rows in a particular range, where BOTH date columns are within the range. In the case below, both timestamps of a row must be before the timestamp of the current row, to be included in the count.

Example df including the count column:

    +---+-----------+-----------+-----+
    | ID|Timestamp_1|Timestamp_2|Count|
    +---+-----------+-----------+-----+
    |  a|          0|          3|    0|
    |  b|          2|          5|    0|
    |  d|          5|          5|    3|
    |  c|          5|          9|    3|
    |  e|          8|         10|    4|
    +---+-----------+-----------+-----+

I tried creating two windows and creating the new column over both of these:

    w_1 = Window.partitionBy().orderBy('Timestamp_1').rangeBetween(Window.unboundedPreceding, 0)
    w_2 = Window.partitionBy().orderBy('Timestamp_2').rangeBetween(Window.unboundedPreceding, 0)

    df = df.withColumn('count', F.count('ID').over(w_1).over(w_2))

However, this is not allowed in Pyspark and therefore gives an error.

Any ideas? Solutions in SQL are also fine!

Upvotes: 0

Views: 299

Answers (1)

tpain
tpain

Reputation: 144

Would a self-join work?

from pyspark.sql import functions as F

df_count = (
    df.alias('a')
    .join(
        df.alias('b'),
        (F.col('b.Timestamp_1') <= F.col('a.Timestamp_1')) &
        (F.col('b.Timestamp_2') <= F.col('a.Timestamp_2')),
        'left'
    )
    .groupBy(
        'a.ID'
    )
    .agg(
        F.count('b.ID').alias('count')
    )
)

df = df.join(df_count, 'ID')

Upvotes: 1

Related Questions