Reputation: 1196
I am currently using Spring Integration Kafka to make real-time statistics. Though, the group name makes Kafka search all the previous values the listener didn't read.
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}
public Map<String, Object> getDefaultProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaMessageListener listener() {
return new KafkaMessageListener();
}
I would like to begin to the latest offset, and not be bothered by old values. Is there a possibility to reset the offset of the group ?
Upvotes: 19
Views: 49868
Reputation: 1170
Another way, we can always consume lastest message without commit group offset , by specify properties value with {"enable.auto.commit:false", "auto.offset.reset:latest"}
for KafkaListener annotation.
@KafkaListener(id = "example-group",
properties = {"enable.auto.commit:false", "auto.offset.reset:latest"},
topics = "example")
Upvotes: 6
Reputation: 13218
For a new consumer group that doesn't have an initial offset in kafka, you can set AUTO_OFFSET_RESET_CONFIG
:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
For an existing consumer group, you can:
consumer-group-id-v2
ConsumerSeekAware
so you can seek to desired offset during initialization See docsUpvotes: 1
Reputation: 133
You can set a ConsumerRebalanceListener
for the kafka consumer while you subscribing to some topics,in which you can get the lastest offset of each partition by KafkaConsumer.endOffsets()
method, and set this to consumer by KafkaConsumer.seek()
method ,like this:
kafkaConsumer.subscribe(Collections.singletonList(topics),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//do nothing
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//get and set the lastest offset for each partiton
kafkaConsumer.endOffsets(partitions)
.forEach((partition, offset) -> kafkaConsumer.seek(partition, offset));
}
}
);
Upvotes: 2
Reputation: 67
you can use partitionOffsets annotation to start with exact offset,for example:
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})public void listen(ConsumerRecord<?, ?> record) {
}
Upvotes: 1
Reputation: 1196
Because I didn't saw any example of this, I'm gonna explain how I did here.
The class of your @KafkaListener
must implement a ConsumerSeekAware
class, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )
public class KafkaMessageListener implements ConsumerSeekAware {
@KafkaListener(topics = "your.topic")
public void listen(byte[] payload) {
// ...
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}
Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan ) for guiding me to the answer.
Upvotes: 35