bachrc
bachrc

Reputation: 1196

Spring Kafka - How to reset offset to latest with a group id?

I am currently using Spring Integration Kafka to make real-time statistics. Though, the group name makes Kafka search all the previous values the listener didn't read.

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}

I would like to begin to the latest offset, and not be bothered by old values. Is there a possibility to reset the offset of the group ?

Upvotes: 19

Views: 49868

Answers (5)

bovenson
bovenson

Reputation: 1170

Another way, we can always consume lastest message without commit group offset , by specify properties value with {"enable.auto.commit:false", "auto.offset.reset:latest"} for KafkaListener annotation.

@KafkaListener(id = "example-group",
        properties = {"enable.auto.commit:false", "auto.offset.reset:latest"},
        topics = "example")

Upvotes: 6

Ahmad Abdelghany
Ahmad Abdelghany

Reputation: 13218

For a new consumer group that doesn't have an initial offset in kafka, you can set AUTO_OFFSET_RESET_CONFIG:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

For an existing consumer group, you can:

  1. Change group id to appear as new i.e. consumer-group-id-v2
  2. Implement ConsumerSeekAware so you can seek to desired offset during initialization See docs

Upvotes: 1

A.Chinese
A.Chinese

Reputation: 133

You can set a ConsumerRebalanceListener for the kafka consumer while you subscribing to some topics,in which you can get the lastest offset of each partition by KafkaConsumer.endOffsets() method, and set this to consumer by KafkaConsumer.seek() method ,like this:

kafkaConsumer.subscribe(Collections.singletonList(topics),
    new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //do nothing
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            //get and set the lastest offset for each partiton
            kafkaConsumer.endOffsets(partitions) 
                .forEach((partition, offset) -> kafkaConsumer.seek(partition, offset));
        }
    }
);

Upvotes: 2

link
link

Reputation: 67

you can use partitionOffsets annotation to start with exact offset,for example:

@KafkaListener(id = "bar", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })public void listen(ConsumerRecord<?, ?> record) {
     }

Upvotes: 1

bachrc
bachrc

Reputation: 1196

Because I didn't saw any example of this, I'm gonna explain how I did here.

The class of your @KafkaListener must implement a ConsumerSeekAware class, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )

public class KafkaMessageListener implements ConsumerSeekAware {
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) {
        // ...
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {


    }
}

Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan ) for guiding me to the answer.

Upvotes: 35

Related Questions