Reputation: 55
The new Temporal Tables in Flink look awesome but I have not yet been able to make them work. As I cannot find any working examples I wonder if anyone else has got it to work and can point out what I'm doing wrong.
Here's a little bit of context:
query:
SELECT s.id FROM sitemembership AS m, LATERAL TABLE (site(m.ts)) AS s WHERE m.siteId = s.id
The setup:
// { "streamName": "sitemembership", "key": "siteId" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
table.printSchema();
tableEnv.registerTable(streamName, table);
// { "streamName": "site", "key": "id" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
TemporalTableFunction temporalTable = table.createTemporalTableFunction("ts", key);
tableEnv.registerFunction(streamName, temporalTable);
I'm getting no rows whatsoever and no errors. I've tried flipping the query by changing which table I register as temporal but with no success. I have also looked at the "ts" column and get dates that makes me believe I should get at least a few rows.
Any help is appreciated.
P.S. I'm running this on historical data from kafka partitioned on "id" which is also the row key
Upvotes: 2
Views: 1453
Reputation: 654
You can find fully working code "examples" in form of the tests here (Content of those two tests (processing time and event time) is more or less repeated in the documentation here and here or here). You can start with those examples and then step by step convert them to your exact use case/scenario. It might be beneficial to first start with pre-defined set of data and only later switch to reading from Kafka.
Regarding your issue, it is unclear from your code snippet what is wrong, some of the potential issues:
assignTimestampsAndWatermarks()
call in the linked testEventTimeInnerJoin()
). Temporal Join operator emits the data only on watermark.site
has no row that is old enough to be joined with sitemembership
records, the result will be empty. Like for example if all of the records from site
have time fields from year 2019
, while sitemembership
have only records from 2018
.Upvotes: 2