Ardelia Lortz
Ardelia Lortz

Reputation: 132

Understanding watermark behaviour when using Kafka connector and Flink SQL

I have a problem with understanding how watermarks are assigned when using Kafka connector and Flink SQL. I have inserted messages into kafka topic in given order:

1. A,2023-07-08 09:00:00,2023-07-08,1.0 
2. A,2023-07-08 09:00:01,2023-07-08,2.0
3. A,2023-07-08 09:02:01,2023-07-08,2.0
4. A,2023-07-08 09:03:00,2023-07-08,4.0
5. A,2023-07-08 09:03:01,2023-07-08,5.0
6. A,2023-07-08 09:04:01,2023-07-08,6.0
7. A,2023-07-08 09:03:10,2023-07-08,5.35

Now I have a source:

CREATE TEMPORARY TABLE source_kafka_test (
  `symbol` VARCHAR(2147483647),
  `msg_ts` TIMESTAMP(3),
  `trade_date` VARCHAR(2147483647),
  `trade` DOUBLE,
  WATERMARK FOR `msg_ts` AS `msg_ts` - INTERVAL '1' MINUTES
)
WITH (
  'connector' = 'kafka',
  'format' = 'csv',
  'properties.bootstrap.servers' = '',
  'topic' = '', 
  'properties.group.id' = '',
  'scan.startup.mode' = 'earliest-offset'
);

Now I order events by msg_ts (watermark) and print a simple select:

create temporary view raw_data_sorted AS (
    select
        symbol,
        msg_ts,
        trade_date,
        trade
    FROM source_kafka_test
    order by msg_ts ASC
);

select * from raw_data_sorted;

My results:

symbol  msg_ts              trade_date  trade
            
A       2023-07-08T09:00    2023-07-08  1.0
A       2023-07-08T09:00:01 2023-07-08  2.0
A       2023-07-08T09:02:01 2023-07-08  2.0

I run this query on preexisting data in kafka topic (all entries exist in kafka before running the query)

It seems the value for watermark is taken from the last entry (2023-07-08 09:03:10) and because of this only first 3 entries are printed (other will be printed after watermark advances).

When I first run Flink SQL job and then insert the same messages into kafka all of them are printed (current watermark is 2023-07-08 09:04:01 - aligned with my understanding)

The question is why this current watermark value is different depending on when the query is executed? (after data is inserted or before data is inserted).

It results in non-deterministic results when running this query for the second time, for ex. some late events are considered non-late and taken into account when performing calculations or some windows may never be closed (or may be closed late) if the last message in kafka was the last trade (in this day)

Upvotes: 0

Views: 229

Answers (1)

David Anderson
David Anderson

Reputation: 43697

After ingesting the 7 events in your example, the watermark will be the largest timestamp (2023-07-08 09:04:01) minus 1 minute, minus 1 millisecond -- or 2023-07-08 09:03:00.999. Any events later than this watermark won't be emitted by the sorting query until a larger watermark comes along.

When Flink jobs end cleanly, Flink send through one final watermark with a huge timestamp. This then triggers the emission of any remaining (buffered) rows.

I'm not sure this explains everything you reported, but hopefully it helps.

Upvotes: 0

Related Questions