Reputation: 89
I'm trying to implement an event time temporal join in Flink. Here's the first join table:
tEnv.executeSql("CREATE TABLE AggregatedTrafficData_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3)," +
"`area` STRING," +
"`networkEdge` STRING," +
"`vehiclesNumber` BIGINT," +
"`averageSpeed` INTEGER," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.trafficdata.aggregated'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'json'," +
"'json.timestamp-format.standard' = 'ISO-8601'" +
")");
The table is used as a sink for the following query:
Table aggregatedTrafficData = trafficData
.window(Slide.over(lit(30).seconds())
.every(lit(15).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("w"), $("networkEdge"), $("area"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("networkEdge"),
$("plate").count().as("vehiclesNumber"),
$("speed").avg().as("averageSpeed")
);
Here's the other join table. I use Debezium to stream a Postgres table into Kafka:
tEnv.executeSql("CREATE TABLE TransportNetworkEdge_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL," +
"`urn` STRING," +
"`flow_rate` INTEGER," +
"PRIMARY KEY(`urn`) NOT ENFORCED," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.network.transport_network_edge'," +
"'scan.startup.mode' = 'latest-offset'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'debezium-json'," +
"'debezium-json.schema-include' = 'true'" +
")");
Finally here's the temporal join:
Table transportNetworkCongestion = tEnv.sqlQuery("SELECT AggregatedTrafficData_Kafka.`timestamp`, `networkEdge`, " +
"congestion(`vehiclesNumber`, `flow_rate`) AS `congestion` FROM AggregatedTrafficData_Kafka " +
"JOIN TransportNetworkEdge_Kafka FOR SYSTEM_TIME AS OF AggregatedTrafficData_Kafka.`timestamp` " +
"ON AggregatedTrafficData_Kafka.`networkEdge` = TransportNetworkEdge_Kafka.`urn`");
The problem I'm having is that the join works only for the first few second (after an update in the Postgres table), but I need to continuosly join the first table with debezium one. Am I doing something wrong? Thanks euks
Upvotes: 0
Views: 1225
Reputation: 43454
Temporal joins using the AS OF syntax you're using require:
When Flink SQL's temporal operators are applied to event time streams, watermarks play a critical role in determining when results are produced, and when the state is cleared.
When performing a temporal join:
The join operator tracks the watermarks it receives from its input channels, and its current watermark is always the minimum of these two watermarks. This is why your join stalls, and only makes progress when the flow_rate is updated.
One way to fix this would be to set the watermark for the TransportNetworkEdge_Kafka table with something like this:
WATERMARK FOR timestamp as CAST('9999-12-31 00:00:00.000' as TIMESTAMP(3))
This will set the watermark for this table/stream to a very large value, which will have the effect of making the watermarks from this stream irrelevant -- this stream's watermarks will never be the smallest.
This will, however, have the drawback of making the join results non-deterministic.
Upvotes: 3