user3822232
user3822232

Reputation: 315

How to do content based deduplication using flink sql

I have my flink sql statement as follows

CREATE OR REPLACE TABLE table_one /** mode('streaming')*/
(
        `pk` string,
        `id` string,
        `segments` ARRAY<STRING>,
        `headers` MAP<STRING, BYTES> METADATA,
        `kafka_key` STRING,
        `ts` timestamp(3) METADATA FROM 'timestamp'VIRTUAL,
         WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
        `hard_deleted` boolean
)WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'kafka:29092',
        'properties.group.id' = 'grp1',
        'topic-pattern' = 'topic_one',
        'value.format' = 'json',
        'format' = 'json',
        'key.format' = 'raw',
        'key.fields' = 'kafka_key',
        'value.fields-include' = 'EXCEPT_KEY',
        'scan.startup.mode' = 'earliest-offset',
        'json.timestamp-format.standard' = 'ISO-8601',
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true'
);

create or replace view table_one_source  as (
SELECT cast(`headers`['pod'] as varchar)            as pod,
       cast(`headers`['org'] as varchar)            as org,
       cast(`headers`['tenantId'] as varchar)       as tenantId,
       kafka_key                                    as pk,
       COALESCE(id, SPLIT_INDEX(kafka_key, '#', 1)) as id,
       segments,
       ts
FROM table_one
WHERE `headers`['tenantId'] is not null
  AND `headers`['pod'] is not null
  AND `headers`['org'] is not null
);

Create or replace view table_one_source_keyed as (
    WITH table_one_source_hash AS (
        SELECT
            pod, org, tenantId, pk, id, segments, ts,
            HASH_CODE(tenantId || id || CAST(segments AS STRING)) AS data_hash
        FROM table_one_source
    ),
    entitlement_source_deduped as (
         SELECT *
         FROM (
             SELECT *,
                    LAG(data_hash) OVER (PARTITION BY tenantId, id ORDER BY ts) AS prev_data_hash
             FROM table_one_source_hash
             )
         WHERE data_hash IS DISTINCT FROM prev_data_hash OR prev_data_hash IS NULL
    )
    
select * from entitlement_source_deduped
);

Goal here is that I only want new or (values of these different from previous) id and segments from table_one to flow downstream. The above sql work. It produces dag like this enter image description here

It using :OverAggregate(partitionBy=[$5, $6], orderBy=[ts ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[segments, kafka_key, ts, $3, $4, $5, $6, $7, LAG($7) AS w0$o0]) . Window for OverAggregate seems to unbounded. Also worried that state of this operator can really grow big.

Question: Is there another way to deduplicate based on content of the message.

Upvotes: 0

Views: 80

Answers (1)

Roman Boyko
Roman Boyko

Reputation: 21

If it's not necessary to you to output prev_data_hash then you can try flink deduplication SQL (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication/). It will create RankOperator with Top1Function which would be still unbounded (as OverAggregate in your example) but probably it would be a bit effective (especially if Primary Key is defined on source table).

Or you can configure the state TTL to limit the size of unbounded state - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#different-ways-to-configure-state-ttl

Upvotes: 0

Related Questions