Reputation: 5093
One can use windows in Flink in two different manners
SELECT key, MAX(value)
FROM table
GROUP BY key, TUMBLE(ts, INTERVAL '5' MINUTE)
and
SELECT key, MAX(value) OVER w
FROM table
WINDOW w AS (PARTITION BY key ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
I wonder if those two mechanisms can express the same thing or what's the main difference and possible use cases for both?
Upvotes: 1
Views: 2287
Reputation: 507
Fabian's answer is very correct. If still have any questions, you can refer to this example. The example comes from Rolling Aggregations on Time Series Data
CREATE TEMPORARY TABLE temperature_measurements (
measurement_time TIMESTAMP(3),
city STRING,
temperature FLOAT,
WATERMARK FOR measurement_time AS measurement_time - INTERVAL '15' SECONDS
)
WITH (
'connector' = 'faker',
'fields.measurement_time.expression' = '#{date.past ''15'',''SECONDS''}',
'fields.temperature.expression' = '#{number.numberBetween ''0'',''50''}',
'fields.city.expression' = '#{regexify ''(Chicago|Munich|Berlin|Portland|Hangzhou|Seatle|Beijing|New York){1}''}'
);
here is the over window usage:
SELECT
measurement_time,
city,
temperature,
AVG(CAST(temperature AS FLOAT)) OVER last_minute AS avg_temperature_minute,
MAX(temperature) OVER last_minute AS min_temperature_minute,
MIN(temperature) OVER last_minute AS max_temperature_minute,
STDDEV(CAST(temperature AS FLOAT)) OVER last_minute AS stdev_temperature_minute
FROM temperature_measurements
WINDOW last_minute AS (
PARTITION BY city
ORDER BY measurement_time
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
);
and the result goes to:
and here is the group by usage :
SELECT
city,
AVG(CAST(temperature AS FLOAT)) AS avg_temperature_minute,
MAX(temperature) AS min_temperature_minute,
MIN(temperature) AS max_temperature_minute,
STDDEV(CAST(temperature AS FLOAT)) AS stdev_temperature_minute
FROM temperature_measurements
group by
city,
TUMBLE(measurement_time, INTERVAL '1' MINUTES);
and the result goes to :
so, here is the final difference:
Upvotes: 0
Reputation: 18987
Both queries compute different results that correspond to the semantics of regular SQL. So the difference is not Flink specific but defined by the SQL Standard.
The first query
SELECT key, MAX(value)
FROM table
GROUP BY key, TUMBLE(ts, INTERVAL '5' MINUTE)
groups records per key
and 5 minute buckets. Every 5 minutes the query produces one row per key
value with the maximum value
. For each group, multiple rows are aggregated to a single row.
The second query
SELECT key, MAX(value) OVER w
FROM table
WINDOW w AS (PARTITION BY key ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
produces one row for every row of the input (table
). A result row has the maximum value
for the key
value that was observed so far (rows are ordered by ts
). Note that multiple rows are not aggregated to a single row; each input row results in one output row. Moreover, the range of the maximum aggregation can be more than 5 minutes. In fact it's the whole key
partition in this example.
Upvotes: 2