Baskar
Baskar

Reputation: 41

Spring Cloud Stream with Kafka - message not being read after restarting the consumer

I have a micro service based application which reads messages from a Kafka topic. When the service is down, if there are any messages being written on the topic, I want the consumer to read those messages when it is up and running the next time. But I am missing all the messages when the service was down. How can I get the consumer to read the messages that were not read when the service was down?

I am getting all the messages when my micro service was up and any messages being return to the topic.

My application.properties:

spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.startOffset=latest
spring.cloud.stream.bindings.input.consumer.resetOffsets=true
spring.cloud.stream.bindings.input.consumer.instanceCount=3
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false

// this is my consumer code under my micro service root dir

@EnableBinding(Sink.class) 
public class Consumer { 
    @ServiceActivator(inputChannel = Sink.INPUT)
    public void consoleSink(Object payload){
        logger.info("Type: "+ payload.getClass() + " which is byte array");
        logger.info( "Payload: " + new String((byte[])payload));
    } }

I appreciate any clue to fix this issue.

Upvotes: 1

Views: 844

Answers (1)

Baskar
Baskar

Reputation: 41

Setting the below properties helped me to fix my issue.

spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.startOffset=latest
spring.cloud.stream.bindings.input.consumer.resetOffsets=true
spring.cloud.stream.bindings.input.consumer.instanceCount=3
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.bindings.input.group=testGroup50
spring.cloud.stream.bindings.input.partitioned=false

Thanks,

BR

Upvotes: 1

Related Questions