Abdul Rahman
Abdul Rahman

Reputation: 1384

Kafka Java Consumer does not pick up messages from where it last left off

I am using org.apache.kafka.clients.consumer.KafkaConsumer in my application to process messages from Kafka. I notice the following behaviour in my app. If I kill my app and then restart it again, any messages that the kafka producer sends between this time interval is not picked up by the kafka consumer in my app when I bring the app up again. Here is how I configure my kafka consumer in my application

  val props = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> EnvironmentConfig.getKafkaBootStrapServers,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.GROUP_ID_CONFIG -> "myGrouop",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean))

// Create the consumer using props.
val consumer = new KafkaConsumer[String, String](props)

What am I missing here in my config? Please help!

Upvotes: 2

Views: 914

Answers (1)

Mickael Maison
Mickael Maison

Reputation: 26885

Upon restarting the consumer will attempt to restart from the last committed offset. If that's not found, by default it restarts from the end.

I see that you've disabled auto-commit (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean))). In that case, are you manually committing offsets ?

If not then that explains why your consumer is restarting from the end. It has not committed any offsets previously thus defaults to latest.

As a starter, I suggest you keep auto-commit enabled. Once you've figured out how it works then maybe look into committing offsets manually if required.

Upvotes: 1

Related Questions