Reputation: 132
I have a problem with understanding how watermarks are assigned when using Kafka connector and Flink SQL. I have inserted messages into kafka topic in given order:
1. A,2023-07-08 09:00:00,2023-07-08,1.0
2. A,2023-07-08 09:00:01,2023-07-08,2.0
3. A,2023-07-08 09:02:01,2023-07-08,2.0
4. A,2023-07-08 09:03:00,2023-07-08,4.0
5. A,2023-07-08 09:03:01,2023-07-08,5.0
6. A,2023-07-08 09:04:01,2023-07-08,6.0
7. A,2023-07-08 09:03:10,2023-07-08,5.35
Now I have a source:
CREATE TEMPORARY TABLE source_kafka_test (
`symbol` VARCHAR(2147483647),
`msg_ts` TIMESTAMP(3),
`trade_date` VARCHAR(2147483647),
`trade` DOUBLE,
WATERMARK FOR `msg_ts` AS `msg_ts` - INTERVAL '1' MINUTES
)
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = '',
'topic' = '',
'properties.group.id' = '',
'scan.startup.mode' = 'earliest-offset'
);
Now I order events by msg_ts
(watermark) and print a simple select:
create temporary view raw_data_sorted AS (
select
symbol,
msg_ts,
trade_date,
trade
FROM source_kafka_test
order by msg_ts ASC
);
select * from raw_data_sorted;
My results:
symbol msg_ts trade_date trade
A 2023-07-08T09:00 2023-07-08 1.0
A 2023-07-08T09:00:01 2023-07-08 2.0
A 2023-07-08T09:02:01 2023-07-08 2.0
I run this query on preexisting data in kafka topic (all entries exist in kafka before running the query)
It seems the value for watermark is taken from the last entry (2023-07-08 09:03:10) and because of this only first 3 entries are printed (other will be printed after watermark advances).
When I first run Flink SQL job and then insert the same messages into kafka all of them are printed (current watermark is 2023-07-08 09:04:01 - aligned with my understanding)
The question is why this current watermark value is different depending on when the query is executed? (after data is inserted or before data is inserted).
It results in non-deterministic results when running this query for the second time, for ex. some late events are considered non-late and taken into account when performing calculations or some windows may never be closed (or may be closed late) if the last message in kafka was the last trade (in this day)
Upvotes: 0
Views: 229
Reputation: 43697
After ingesting the 7 events in your example, the watermark will be the largest timestamp (2023-07-08 09:04:01) minus 1 minute, minus 1 millisecond -- or 2023-07-08 09:03:00.999. Any events later than this watermark won't be emitted by the sorting query until a larger watermark comes along.
When Flink jobs end cleanly, Flink send through one final watermark with a huge timestamp. This then triggers the emission of any remaining (buffered) rows.
I'm not sure this explains everything you reported, but hopefully it helps.
Upvotes: 0