Martin Bomio
Martin Bomio

Reputation: 11

Flink Sql deduplication before a TVF

We are trying to run a FlinkSQL query that applies some deduplication and then windows and aggregates the result of that deduplication, but running into the following error at query plan time: org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[order_id], order=[ROWTIME])

We managed to get a simple example query reproducing this issue:

CREATE TABLE Orders (
  order_id STRING, 
  user_id STRING, 
  product STRING, 
  num BIGINT, 
  user_action_time TIMESTAMP(3), 
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen');

WITH Deduplicated AS (
  SELECT 
    order_id, 
    user_id, 
    product, 
    num, 
    user_action_time 
  FROM 
    (
      SELECT 
        *, 
        ROW_NUMBER() OVER (
          PARTITION BY order_id 
          ORDER BY 
            user_action_time ASC
        ) AS row_num 
      FROM 
        Orders
    ) 
  WHERE 
    row_num = 1
) 
SELECT 
  user_id, 
  SUM(num) as num_sum 
FROM 
  TABLE(
    TUMBLE(
      TABLE Deduplicated, 
      DESCRIPTOR(user_action_time), 
      INTERVAL '5' MINUTES
    )
  ) 
GROUP BY 
  user_id, 
  window_start, 
  window_end

If the same query is run using PROCTIME instead of ROWTIME, the query runs successfully.

We are using flink 1.15.0

Is this expected behavior?

Upvotes: 0

Views: 382

Answers (2)

Martijn Visser
Martijn Visser

Reputation: 2108

This is currently not possible. You can track https://issues.apache.org/jira/browse/FLINK-27539 (not sure yet if this is a bug or a new feature for Flink)

Upvotes: 0

Metehan Yıldırım
Metehan Yıldırım

Reputation: 401

Can you try this? I am not sure what the problem is.

WITH Deduplicated AS (
  SELECT 
    order_id, 
    user_id, 
    product, 
    num, 
    user_action_time 
  FROM 
    (
      SELECT 
        *, 
        ROW_NUMBER() OVER (
          PARTITION BY order_id 
          ORDER BY 
            user_action_time ASC
        ) AS row_num 
      FROM 
        Orders
    ) 
  WHERE 
    row_num = 1
) 
SELECT 
  user_id, 
  SUM(num) as num_sum 
FROM Deduplicated
GROUP BY user_id, TUMBLE(user_action_time, INTERVAL '5' MINUTES);

Upvotes: 0

Related Questions