Andy
Andy

Reputation: 6329

springboot consumer wont consume any messages

I have a springboot consumer app. When I ran it the first time, it was consuming messages from the Kafka topic. But when I ran it again, it stopped consuming. In the logs I see the following message.

2022-08-02 00:14:08.130  INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Fetch position FetchPosition{offset=17400, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition CraveVideoTestLog-0, resetting offset
2022-08-02 00:14:08.135  INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Resetting offset for partition CraveVideoTestLog-0 to position FetchPosition{offset=56464, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.

I understand that the consumer could not get the offset. In situations like this, the consumer will refer to auto-offset-reset property. As you can see, I have set it to earliest, hoping that the consumer would read from the beginning. But it does not.

application.yml

spring:
  application:
    name: my-transformation
  kafka:
    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      retries: 2
      compression-type: snappy
    consumer:
      group-id: ${GROUP_ID:my-transformation}
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

In my Java class

    @SendTo("output-topic-here")
    @KafkaListener(topics = "${my-transformation.input.topic}", errorHandler = "myErrorHandler")
    public Message<?> process(Message<String> in, ConsumerRecord<String, String> record) throws Exception {
        log.info("Going to process event for key: {} offset: {} partition: {}", record.key(), record.offset(), record.partition());
    }

I tried out a few of things.

I feel I am missing something really silly, but could not figure out what it is.

Upvotes: 2

Views: 857

Answers (2)

Andy
Andy

Reputation: 6329

Finally, I found out the reason for my consumer's behaviour. The topic's retention policy was set to delete and retention time was set to default(1 week).

The reason why the consumer stopped consuming records was because the topic's retention time had elapsed. So the entire data in the topic was deleted and hence the consumer had nothing to read. When I pushed new records into the topic again, the consumer started consuming.

Upvotes: 1

ChristDist
ChristDist

Reputation: 768

The auto.offset.reset property will work in the following ways.

Use Case 1: A consumer starts and has auto.offset.reset=latest, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed the some offset for the topic before. Where will the consumer read from?

Ans : The offsets are already committed for this consumer group and topic partition, so the property auto.offset.reset is ignored

Use Case 2: A consumer starts and has auto.offset.reset=none, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed some offsets for the topic before. Where will the consumer read from?

Ans: auto.offset.reset=none means that the consumer will crash if the offsets it's recovering from have been deleted from Kafka.

Use Case 3: A consumer has auto.offset.reset=latest, and the topic partition currently has data for offsets going from some range to other. The consumer group never committed offsets for the topic before. Where will the consumer read from?

Ans: Latest means that data retrievals will start from where the offsets currently end.

Use Case 4: A consumer has auto.offset.reset=earliest, and the topic partition currently has data for offsets going from some range to other. The consumer group never committed offsets for the topic before. Where will the consumer read from?

Ans: Earliest means that data retrievals will start from beginning of the partition

Use Case 5: A consumer starts and has auto.offset.reset=earliest, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed the some offset for the topic before. Where will the consumer read from?

Ans : The offsets are already committed for this consumer group and topic partition, so the property auto.offset.reset is ignored

Upvotes: 2

Related Questions