Reputation: 11
We are trying to run a FlinkSQL query that applies some deduplication and then windows and aggregates the result of that deduplication, but running into the following error at query plan time:
org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[order_id], order=[ROWTIME])
We managed to get a simple example query reproducing this issue:
CREATE TABLE Orders (
order_id STRING,
user_id STRING,
product STRING,
num BIGINT,
user_action_time TIMESTAMP(3),
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen');
WITH Deduplicated AS (
SELECT
order_id,
user_id,
product,
num,
user_action_time
FROM
(
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY
user_action_time ASC
) AS row_num
FROM
Orders
)
WHERE
row_num = 1
)
SELECT
user_id,
SUM(num) as num_sum
FROM
TABLE(
TUMBLE(
TABLE Deduplicated,
DESCRIPTOR(user_action_time),
INTERVAL '5' MINUTES
)
)
GROUP BY
user_id,
window_start,
window_end
If the same query is run using PROCTIME
instead of ROWTIME
, the query runs successfully.
We are using flink 1.15.0
Is this expected behavior?
Upvotes: 0
Views: 382
Reputation: 2108
This is currently not possible. You can track https://issues.apache.org/jira/browse/FLINK-27539 (not sure yet if this is a bug or a new feature for Flink)
Upvotes: 0
Reputation: 401
Can you try this? I am not sure what the problem is.
WITH Deduplicated AS (
SELECT
order_id,
user_id,
product,
num,
user_action_time
FROM
(
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY
user_action_time ASC
) AS row_num
FROM
Orders
)
WHERE
row_num = 1
)
SELECT
user_id,
SUM(num) as num_sum
FROM Deduplicated
GROUP BY user_id, TUMBLE(user_action_time, INTERVAL '5' MINUTES);
Upvotes: 0