Reputation: 11
I am using Flink 1.14 deployed by lyft flink operator
I am trying to make tumble window aggregate with the Table API, read from the transactions table source, and put the aggregate result by window into a new kafka topic
My source is a kafka topic from debezium
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
//this is the source
tEnv.executeSql("CREATE TABLE transactions (\n" +
" event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n"+
" transaction_time AS TO_TIMESTAMP_LTZ(4001, 3),\n"+
" id INT PRIMARY KEY,\n" +
" transaction_status STRING,\n" +
" transaction_type STRING,\n" +
" merchant_id INT,\n" +
" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'debezium-json.schema-include' = 'true' ,\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'dbserver1.inventory.transactions',\n" +
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n"+
" 'format' = 'debezium-json'\n" +
")");
I do the tumble window and count the ids in the same window by:
public static Table report(Table transactions) {
return transactions
.window(Tumble.over(lit(2).minutes()).on($("transaction_time")).as("w"))
.groupBy($("w"), $("transaction_status"))
.select(
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("transaction_status"),
$("id").count().as("id_count"));
}
The sink is:
tEnv.executeSql("CREATE TABLE my_report (\n" +
"window_start TIMESTAMP(3),\n"+
"window_end TIMESTAMP(3)\n,"+
"transaction_status STRING,\n" +
" id_count BIGINT,\n" +
" PRIMARY KEY (window_start) NOT ENFORCED\n"+
") WITH (\n" +
" 'connector' = 'upsert-kafka',\n" +
" 'topic' = 'dbserver1.inventory.my-window-sink',\n" +
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'key.format' = 'json',\n"+
" 'value.format' = 'json'\n"+
")");
Table transactions = tEnv.from("transactions");
Table merchants = tEnv.from("merchants");
report(transactions).executeInsert("my_report");
The problem is when I consume dbserver1.inventory.my-window-sink kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.inventory.my-window-sink --from-beginning
I don't get any results, I wait 2 minutes (the window size), insert into the transactions table and then wait again for 2 min and insert again also no results.
I don't know if I have a problem with my watermark
I am working with parallelism: 2
On the flink dashboard UI I can see that in the Details of GroupWindowAggregate task the Records Received is increased when I insert into the table but still, I can't see the results when I consume the topic!
Upvotes: 0
Views: 794
Reputation: 11
In addition to what David thankfully answered, I was missing table.exec.source.idle-timeout as a configuration of the streaming environment, a variable that checks if the source becomes idle. The default value of the variable is 0 which means that it doesn't check if the source becomes idle. I made it 1000ms and that fixed it as it checks for that idle source condition and the watermarks are generated properly that way. this won't probably affect regular streams that have consistent message ingestion into them but was the case for me as I was inserting records manually and hence the stream was idle at a lot of times
Upvotes: 1
Reputation: 43697
With this line
transaction_time AS TO_TIMESTAMP_LTZ(4001, 3)
you have given every event the same transaction time (4001), and with
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND
you have arranged for the watermarks to depend on the transaction_time. With this arrangement, time is standing still, and the windows can never close.
As for "I wait 2 minutes (the window size)," this isn't how event time processing works. Assuming the timestamps and watermarks were actually moving forward, you would need to wait however long it takes to process 2 minutes worth of data.
Upvotes: 1