bottaio
bottaio

Reputation: 5093

How GROUP BY and OVER WINDOW differ in FlinkSQL?

One can use windows in Flink in two different manners

SELECT key, MAX(value)
  FROM table
 GROUP BY key, TUMBLE(ts, INTERVAL '5' MINUTE)

and

SELECT key, MAX(value) OVER w
  FROM table
 WINDOW w AS (PARTITION BY key ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

I wonder if those two mechanisms can express the same thing or what's the main difference and possible use cases for both?

Upvotes: 1

Views: 2287

Answers (2)

0x822a5b87
0x822a5b87

Reputation: 507

Fabian's answer is very correct. If still have any questions, you can refer to this example. The example comes from Rolling Aggregations on Time Series Data

CREATE TEMPORARY TABLE temperature_measurements (
  measurement_time TIMESTAMP(3),
  city STRING,
  temperature FLOAT, 
  WATERMARK FOR measurement_time AS measurement_time - INTERVAL '15' SECONDS
)
WITH (
  'connector' = 'faker',
  'fields.measurement_time.expression' = '#{date.past ''15'',''SECONDS''}',
  'fields.temperature.expression' = '#{number.numberBetween ''0'',''50''}',
  'fields.city.expression' = '#{regexify ''(Chicago|Munich|Berlin|Portland|Hangzhou|Seatle|Beijing|New York){1}''}'
);

here is the over window usage:

SELECT 
  measurement_time,
  city, 
  temperature,
  AVG(CAST(temperature AS FLOAT)) OVER last_minute AS avg_temperature_minute,
  MAX(temperature) OVER last_minute AS min_temperature_minute,
  MIN(temperature) OVER last_minute AS max_temperature_minute,
  STDDEV(CAST(temperature AS FLOAT)) OVER last_minute AS stdev_temperature_minute
FROM temperature_measurements 
WINDOW last_minute AS (
  PARTITION BY city
  ORDER BY measurement_time
  RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW 
);

and the result goes to:

enter image description here

and here is the group by usage :

SELECT
    city,
    AVG(CAST(temperature AS FLOAT)) AS avg_temperature_minute,
        MAX(temperature) AS min_temperature_minute,
        MIN(temperature) AS max_temperature_minute,
        STDDEV(CAST(temperature AS FLOAT)) AS stdev_temperature_minute
FROM temperature_measurements 
group by 
city,
TUMBLE(measurement_time, INTERVAL '1' MINUTES);

and the result goes to :

enter image description here

so, here is the final difference:

  1. we cannot select measurement_time, temperature because Expression 'measurement_time', 'temperature' is not being grouped;
  2. for every row, the over window prouce an aggregated value. for every row with group, the group by produce an aggregated value.

Upvotes: 0

Fabian Hueske
Fabian Hueske

Reputation: 18987

Both queries compute different results that correspond to the semantics of regular SQL. So the difference is not Flink specific but defined by the SQL Standard.


The first query

SELECT key, MAX(value)
  FROM table
 GROUP BY key, TUMBLE(ts, INTERVAL '5' MINUTE)

groups records per key and 5 minute buckets. Every 5 minutes the query produces one row per key value with the maximum value. For each group, multiple rows are aggregated to a single row.


The second query

SELECT key, MAX(value) OVER w
  FROM table
 WINDOW w AS (PARTITION BY key ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

produces one row for every row of the input (table). A result row has the maximum value for the key value that was observed so far (rows are ordered by ts). Note that multiple rows are not aggregated to a single row; each input row results in one output row. Moreover, the range of the maximum aggregation can be more than 5 minutes. In fact it's the whole key partition in this example.

Upvotes: 2

Related Questions