Reputation: 416
I am getting below info message every time in kafka consumer.
2020-07-04 14:54:27.640 INFO 1 --- [istener-0-0-C-1] c.n.o.c.h.p.n.PersistenceKafkaConsumer : beginning to consume batch messages , Message Count :11
2020-07-04 14:54:27.809 INFO 1 --- [istener-0-0-C-1] c.n.o.c.h.p.n.PersistenceKafkaConsumer : Execution Time :169
2020-07-04 14:54:27.809 INFO 1 --- [istener-0-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {nbi.cm.changes.mo.test23-1=OffsetAndMetadata{offset=5705, leaderEpoch=null, metadata=''}}
2020-07-04 14:54:27.812 INFO 1 --- [istener-0-0-C-1] c.n.o.c.h.p.n.PersistenceKafkaConsumer : Acknowledgment Success
2020-07-04 14:54:27.813 INFO 1 --- [istener-0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=cm-persistence-notification] Fetch offset 5705 is out of range for partition nbi.cm.changes.mo.test23-1, resetting offset
2020-07-04 14:54:27.820 INFO 1 --- [istener-0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-1, groupId=cm-persistence-notification] Resetting offset for partition nbi.cm.changes.mo.test23-1 to offset 666703.
Got OFFSET_OUT_OF_RANGE error in debug log and resetting to some other partition that actually not exist. Same all messages able to receive in consumer console.
But actually I committed offset before that only , offset are available in kafka , log retention policy is 24hr, so it's not deleted in kafka.
In debug log, I got below messages:
beginning to consume batch messages , Message Count :710
2020-07-02 04:58:31.486 DEBUG 1 --- [ce-notification] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-1, groupId=cm-persistence-notification] Node 1002 sent an incremental fetch response for session 253529272 with 1 response partition(s)
2020-07-02 04:58:31.486 DEBUG 1 --- [ce-notification] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=cm-persistence-notification] Fetch READ_UNCOMMITTED at offset 11372 for partition nbi.cm.changes.mo.test12-1 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=0)
When all we will get OFFSET_OUT_OF_RANGE.
Listener Class :
@KafkaListener( id = "batch-listener-0", topics = "topic1", groupId = "test", containerFactory = KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME )
public void receive(
@Payload List<String> messages,
@Header( KafkaHeaders.RECEIVED_MESSAGE_KEY ) List<String> keys,
@Header( KafkaHeaders.RECEIVED_PARTITION_ID ) List<Integer> partitions,
@Header( KafkaHeaders.RECEIVED_TOPIC ) List<String> topics,
@Header( KafkaHeaders.OFFSET ) List<Long> offsets,
Acknowledgment ack )
{
long startTime = System.currentTimeMillis();
handleNotifications( messages ); // will take more than 5s to process all messages
long endTime = System.currentTimeMillis();
long timeElapsed = endTime - startTime;
LOGGER.info( "Execution Time :{}", timeElapsed );
ack.acknowledge();
LOGGER.info( "Acknowledgment Success" );
}
Do i need to close consumer here , i thought spring-kafka automatically take care those , if no could you please tell how to close in apring-kafka and also how to check if rebalance happened or not , because in DEBUG log not able to see any log related to rebalance.
Upvotes: 0
Views: 3348
Reputation: 11890
I think your consumer may be rebalancing, because you are not calling consumer.close()
at the end of your process.
This is a guess, but if the retention policy isn't kicking in (and the logs are not being deleted), this is the only reason I can tell for that behaviour.
Update:
As you set them as @KafkaListeners
, you could just call stop()
on the KafkaListenerEndpointRegistry: kafkaListenerEndpointRegistry.stop()
Upvotes: 2