Reputation: 6329
I have a springboot consumer app. When I ran it the first time, it was consuming messages from the Kafka topic. But when I ran it again, it stopped consuming. In the logs I see the following message.
2022-08-02 00:14:08.130 INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Fetch position FetchPosition{offset=17400, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition CraveVideoTestLog-0, resetting offset
2022-08-02 00:14:08.135 INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Resetting offset for partition CraveVideoTestLog-0 to position FetchPosition{offset=56464, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
I understand that the consumer could not get the offset. In situations like this, the consumer will refer to auto-offset-reset property. As you can see, I have set it to earliest
, hoping that the consumer would read from the beginning. But it does not.
application.yml
spring:
application:
name: my-transformation
kafka:
bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
retries: 2
compression-type: snappy
consumer:
group-id: ${GROUP_ID:my-transformation}
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
In my Java class
@SendTo("output-topic-here")
@KafkaListener(topics = "${my-transformation.input.topic}", errorHandler = "myErrorHandler")
public Message<?> process(Message<String> in, ConsumerRecord<String, String> record) throws Exception {
log.info("Going to process event for key: {} offset: {} partition: {}", record.key(), record.offset(), record.partition());
}
I tried out a few of things.
auto-offset-reset
value to none
. As expected, it threw an exception complaining about the offset.I feel I am missing something really silly, but could not figure out what it is.
Upvotes: 2
Views: 857
Reputation: 6329
Finally, I found out the reason for my consumer's behaviour. The topic's retention policy was set to delete and retention time was set to default(1 week).
The reason why the consumer stopped consuming records was because the topic's retention time had elapsed. So the entire data in the topic was deleted and hence the consumer had nothing to read. When I pushed new records into the topic again, the consumer started consuming.
Upvotes: 1
Reputation: 768
The auto.offset.reset property will work in the following ways.
Use Case 1: A consumer starts and has auto.offset.reset=latest, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed the some offset for the topic before. Where will the consumer read from?
Ans : The offsets are already committed for this consumer group and topic partition, so the property auto.offset.reset is ignored
Use Case 2: A consumer starts and has auto.offset.reset=none, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed some offsets for the topic before. Where will the consumer read from?
Ans: auto.offset.reset=none means that the consumer will crash if the offsets it's recovering from have been deleted from Kafka.
Use Case 3: A consumer has auto.offset.reset=latest, and the topic partition currently has data for offsets going from some range to other. The consumer group never committed offsets for the topic before. Where will the consumer read from?
Ans: Latest means that data retrievals will start from where the offsets currently end.
Use Case 4: A consumer has auto.offset.reset=earliest, and the topic partition currently has data for offsets going from some range to other. The consumer group never committed offsets for the topic before. Where will the consumer read from?
Ans: Earliest means that data retrievals will start from beginning of the partition
Use Case 5: A consumer starts and has auto.offset.reset=earliest, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed the some offset for the topic before. Where will the consumer read from?
Ans : The offsets are already committed for this consumer group and topic partition, so the property auto.offset.reset is ignored
Upvotes: 2