VictorGram
VictorGram

Reputation: 2661

Kafka : Consumer api : Failing to read and acknowledge manually from offset with kafka-consumer-api

My use case is to use kafka consumer-api so that we can manually read from the offset of the last-successfully-processed data from a kafka-topic and then acknowledge manually for sucessfully-processed-data to Kafka. (This was to reduce data loss). However, with my current implementation, the program moves forward and reads from next offset even though I comment out the 'ack.acknowledge()' . I am new to Kafka and implemented my consumer the way below ( we are using spring boot)

Problem is : Even if I comment out ack.acknowledge(), the offset is still being updated and the consumer is reading from next offset which is unexpected ( to my understanding so far)

Consumer Config [ Please notice that I set ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false and set factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE) ]:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private AdapterStreamProperties appProperties;

    @Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
    private String receiveBufferBytes;

    @Bean
    public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "adapter");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

Then my consumer is consuming this way [ Even if I comment out 'ack.acknowledge()' , it's still reading from the next offset next time:

  @KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
  public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
                     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                     @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {

    System.out.println("----------------------------------------------------------------------------");
    System.out.println("Reading from Offset: " +  offset + ", and Partition: " + partition);
    System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " +   record.value());
    System.out.println("----------------------------------------------------------------------------");
    NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());

    if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
      kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
    }
    else {
      kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
    }
    ack.acknowledge();
  }

The kafka api version from my gradle.build:

//Kafka Dependencie
implementation      'org.apache.kafka:kafka-streams:2.0.1'
implementation      'org.apache.kafka:kafka-clients:2.0.1'

Any insight will be helpful.

Thanks in advance

Upvotes: 1

Views: 1105

Answers (1)

snap
snap

Reputation: 1925

ack.acknowledge()

does only mean that you commit, that your consumer has successfully processed the message.

This will update the consumer group offset, if your application would stop without ack, the current offset will not be committed to kafka (of the last messages you have processed and not acknowledged).

A new consumer with the same consumer group (and same assigned partissions) would consume the same message again because the app will read the starting point from the consumer group offsets.

If you ack or don't ack has no influence of your current listener, it will simple continuing process new messages (as long as no error happen and I guess a rebalancing could also be the source of a offset reloading (not complete sure))

Upvotes: 2

Related Questions