Satya
Satya

Reputation: 1037

Spring Kafka Listening to all topics and adjusting partition offsets

Based on the documentation at spring-kafka, I am using Annotation based @KafkaListener to configure my consumer.

What I see is that -

  1. Unless I specify the offset to zero, up on start, Kafka consumer picks up the future messages and not the existing ones. (I understand this is an expected result because I am not specifying the offset to what I want)

  2. I see an option in the documentation to specify a topic + partition combination and along with that an offset of zero, but if I do this - I have to explicitly specify which topic I want my consumer to listen to.

Using approach 2 above, this is how my consumer looks now -

@KafkaListener(id = "{group.id}",
        topicPartitions = {
                @TopicPartition(topic = "${kafka.topic.name}",
                        partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
        },
        containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String payload,
                   Acknowledgment ack) throws InterruptedException, IOException {

    logger.debug("This is what we received in the Kafka Consumer = " + payload);

    idService.process(payload);

    ack.acknowledge();
}

While I understand that there is an option to specify the "topicPattern" wild card or a "topics" list as a part of the annotation configuration, I don't see a place where I can provide the offset value to start from zero for the topics / topic patterns listed. Is there a way to do a combination of both? Please advise.

Upvotes: 1

Views: 8752

Answers (1)

Gary Russell
Gary Russell

Reputation: 174524

When using topics and topicPatterns (rather than explicitly declaring the partitions), Kafka decides which consumer instance will get which partitions.

Kafka will allocate the partitions and the initial offset will be the last committed for that group id. You cannot currently change that offset but we are considering adding a seek function.

If you always want to start at the first available offset, use a unique group id (e.g. UUID.randomUUID().toString()) and set

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Since Kafka will have no existing offset for that group id it will use that property to determine where to start.

You can also use MANUAL ack mode and never ack, which will effectively do the same thing.

Upvotes: 7

Related Questions