Georg Heiler
Georg Heiler

Reputation: 17694

Spark SQL sliding window difference computation

How can I compute a sliding window in Spark without resorting to spark streaming?

NOTICE: I do not want to use a WINDOW PARTITION BY ORDER BY k ROWS before/after current one, but use the timestamp. The window operator has such a mode:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([{"a": "x", "b": "2021-02-21 01:00:00", "c": "3"},
                            {"a": "x", "b": "2021-02-21 02:00:00", "c": "4"},
                            {"a": "x", "b": "2021-02-21 03:00:00", "c": "2"}])

hour_interval = str(4) + ' hour'
sliding_window = str(60) + ' minute'
from pyspark.sql.functions import col, min, max, sum, when, lit, window, date_format
import time

df_aggregated_time_window = df.groupBy("a", window("b", windowDuration=hour_interval,
                                slideDuration=sliding_window, startTime="30 minute")).agg(min("c").alias("min_c"))
df_aggregated_time_window.show(truncate=False)

+---+------------------------------------------+-----+
|a  |window                                    |min_c|
+---+------------------------------------------+-----+
|x  |[2021-02-20 23:00:00, 2021-02-21 03:00:00]|3    |
|x  |[2021-02-21 00:00:00, 2021-02-21 04:00:00]|2    |
|x  |[2021-02-20 22:00:00, 2021-02-21 02:00:00]|3    |
|x  |[2021-02-21 02:00:00, 2021-02-21 06:00:00]|2    |
|x  |[2021-02-21 01:00:00, 2021-02-21 05:00:00]|2    |
|x  |[2021-02-21 03:00:00, 2021-02-21 07:00:00]|2    |
+---+------------------------------------------+-----+

My desired result would return for each of the 3 input rows 3 output rows as the sliding delta of the 4 hourly time-based window (= state) which is advanced by one hour every hour and triggered once every hour (however as this is batch, not streaming triggering should not matter so much).

Instead, I get the result above with cardinality > the number of desired rows.

edit

Desired output:

input:

x,2021-02-21 01:00:00",3
x,2021-02-21 02:00:00",4
x,2021-02-21 03:00:00",4
x,2021-02-21 04:00:00",1

output:

x,2021-02-21 01:00:00", NULL // no single previous record to be found in the previous 3 hours (including self)
x,2021-02-21 02:00:00",3 // as we are currently only computing `min` for simplicity (later it should be max - min to see the deltas) within the last 3 hours the value is 3 (coincidentally the previous row)
x,2021-02-21 03:00:00",3 // within 4 hour window 3 is still the smallest
x,2021-02-21 04:00:00",1 // within the previous <= 3 hours (including self) 1 is smallest

Upvotes: 1

Views: 1212

Answers (1)

mazaneicha
mazaneicha

Reputation: 9427

I'm afraid your assumption about window expression is incorrect. According to its documentation here:

def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column

Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05). ...

So in your case of a 4-hour window and 1-hour sliding steps, there is going to be 6 buckets over which aggregations can be applied:

[2021-02-20 22:00:00, 2021-02-21 02:00:00)  <-- first bucket that contains the earliest b = 2021-02-21 01:00:00
[2021-02-20 23:00:00, 2021-02-21 03:00:00)
[2021-02-21 00:00:00, 2021-02-21 04:00:00)
[2021-02-21 01:00:00, 2021-02-21 05:00:00)
[2021-02-21 02:00:00, 2021-02-21 06:00:00)
[2021-02-21 03:00:00, 2021-02-21 07:00:00) <-- last bucket that contains the latest b = 2021-02-21 03:00:00

I do not entirely understand "I do not want to use a WINDOW PARTITION BY ORDER BY..." because that is what will allow you to efficiently fulfil your requirement to get one output row for each input computed as the state of the current hour and the previous 3 hours.

Upvotes: 1

Related Questions