Reputation: 900
I have a Flink job running FlinkSQL with the following setup:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
env.setMaxParallelism(env.getParallelism() * 8);
env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());
final TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetention(Duration.ofMinutes(60));
tConfig.getConfiguration().setString("table.exec.source.idle-timeout", "180000 ms");
To test this locally with a Kafka source, I fired a few events to the Flink job. The Flink UI shows it produced one watermark. I waited 3 minutes to see if watermarks advance without sending in new events (i.e idle partition). However, no watermark advancement occurred.
Note: I use a Kafka broker locally with three partitions. and my test data is keyed and hence gets sent to the same partition. However, I am not seeing watermarks advance even if other partitions are idle and if I wait 3 minutes.
Any place in the JOB UI I could see if the value i set for 3 minutes is actually picked up? Am I using the right units(seconds vs ms)
Anything else I could check to test this setting?
We are running Flink 1.12.1.
Update: I see this exception in my Flink SQL job under exceptions: Wonder if there is a mismatch of versions.
2021-10-26 16:38:14
java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
at java.base/java.util.Optional.ifPresent(Unknown Source)
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
at java.base/java.util.HashMap.forEach(Unknown Source)
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
at org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
at org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
at java.base/java.util.HashMap.forEach(Unknown Source)
at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
Upvotes: 0
Views: 967
Reputation: 900
The issue was that this setting does not work in Flink 1.12.0 or 1.12.1. I had to upgrade to Flink 1.13.2 and the setting was honored and worked as expected.
The exception was a red herring and not consistently reproducible.
Upvotes: 2