Reputation: 1725
I am trying to implement manual offset commit
for the messages received on kafka
. I have set the offset commit to false
, but the offset value keeps on increasing.
Not sure what is the reason. Need help resolving the issue.
Below is the code
application.yml
spring:
application:
name: kafka-consumer-sample
resources:
cache:
period: 60m
kafka:
bootstrapServers: localhost:9092
options:
enable:
auto:
commit: false
KafkaConfig.java
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
KafkaConsumer.java
@Service
public class KafkaConsumer {
@KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}")
public void consume(ConsumerRecord<String, String> record) {
System.out.println("Consumed Kafka Record: " + record);
record.timestampType();
System.out.println("record.timestamp() = " + record.timestamp());
System.out.println("***********************************");
System.out.println(record.timestamp());
System.out.println("record.key() = " + record.key());
System.out.println("Consumed String Message : " + record.value());
}
}
output is as follows
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1573570989565, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 10)
record.timestamp() = 1573570989565
***********************************
1573570989565
record.key() = null
Consumed String Message : 10
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1573570991535, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 11)
record.timestamp() = 1573570991535
***********************************
1573570991535
record.key() = null
Consumed String Message : 11
Properties are as follows.
auto.commit.interval.ms = 100000000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000
This is after I restart the consumer. I expected the earlier data to be printed as well.
Is my Understanding correct? Please Note I am restarting my springboot app expecting the messages to start from first. and my kafka server and zookeeper are not terminated.
Upvotes: 7
Views: 13217
Reputation: 40088
If theauto
acknowledgement is disabled by using this property ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
, Then you have to set the acknowledgement mode on container level to MANUAL
and don't commit the offset
because by default it is set to BATCH.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
Because when auto acknowledgement is disabled container level acknowledgement is set to BATCH
public void setAckMode(ContainerProperties.AckMode ackMode)
Set the ack mode to use when auto ack (in the configuration properties) is false.
Parameters:
ackMode - the ContainerProperties.AckMode; default BATCH.
Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, Kafka auto-commits the offsets according to its configuration. If it is false, the containers support several AckMode settings (described in the next list). The default AckMode is BATCH. Starting with version 2.3, the framework sets enable.auto.commit to false unless explicitly set in the configuration. Previously, the Kafka default (true) was used if the property was not set.
And if you want to read from the beginning always you have to set this property auto.offset.reset
to earliest
config.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest");
Note : Make sure groupId
must be the new one which does not have any offset in kafka
Upvotes: 7