Shrey
Shrey

Reputation: 63

Unable to commit consuming offsets to Kafka on checkpoint in Flink new Kafka consumer-api (1.14)

I am referring Flink 1.14 version for the Kafka source connector with the below code.

I am expecting the below requirements.

With Flink new KafkaConsumer API (KafkaSource) I am facing the below problems

When you kill the application manually within that 2s/3s and restart. Since the last consumed message is not committed it is read twice(duplicate).

To cross-check this feature I have tried with Flink Kafka's older consumer API (FlinkKafkaConsumer). There it is perfectly working. As and when a message is consumed immediately it is committed back to Kafka.

Steps followed

Please suggest if anything that I am missing or any property needs to be added.

 @Test
    public void test() throws Exception {

        System.out.println("FlinkKafkaStreamsTest started ..");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.enableCheckpointing(500);
        env.setParallelism(4);

        Properties propertiesOld = new Properties();
        Properties properties = new Properties();
        String inputTopic = "input_topic";
        String bootStrapServers = "localhost:29092";
        String groupId_older = "older_test1";
        String groupId = "test1";

        propertiesOld.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        propertiesOld.put(ConsumerConfig.GROUP_ID_CONFIG, groupId_older);
        propertiesOld.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);


        /******************** Old Kafka API **************/
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(inputTopic,
                new KRecordDes(),
                propertiesOld);
        flinkKafkaConsumer.setStartFromGroupOffsets();
        env.addSource(flinkKafkaConsumer).print("old-api");


        /******************** New Kafka API **************/
        KafkaSourceBuilder<String> sourceBuilder = KafkaSource.<String>builder()
                .setBootstrapServers(bootStrapServers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("enable.auto.commit", "false")
                .setProperty("commit.offsets.on.checkpoint", "true")
                .setProperties(properties)
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));

        KafkaSource<String> kafkaSource = sourceBuilder.build();

        SingleOutputStreamOperator<String> source = env
                .fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");

        source.print("new-api");

        env.execute();
    }
    static class KRecordDes implements  KafkaDeserializationSchema<String>{
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }
        @Override
        public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
            return new String(consumerRecord.value());
        }
    }

Note: I have other requirements where I want the Flink Kafka bounded source reader in the same code, which is available in new APIs(KafkaSource).

Upvotes: 3

Views: 4458

Answers (1)

renqs
renqs

Reputation: 61

From the documentation of Kafka Source:

Note that Kafka source does NOT rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring.

When the Flink job recovers from failure, instead of using committed offsets on broker, it'll restore state from the latest successful checkpoint, and resume consuming from the offset stored in that checkpoint, so records after the checkpoint will be "replayed" a little bit. Since you are using print sink, which does not support exactly-once semantic, you will see duplicated records that are actually records after the latest successful checkpoint.

For the 2-3 second delay of offset commit you mentioned, it is because of the implementation of SourceReaderBase. In short words SplitFetcher manages a task queue, and when an offset commit task is pushed into the queue, it won't be executed until a running fetch task invoking KafkaConsumer#poll() times out. The delay could be longer if the traffic is quite small. But note that this won't affect correctness: KafkaSource doesn't use committed offset for fault tolerance.

Upvotes: 2

Related Questions