user5479362
user5479362

Reputation:

Messages produced before first consumer connected lost

I've created a topic in kafka using kafka-topic.sh and tested it with java client:

kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 2 \
--topic my-topic

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"), new LoggingConsumerRebalanceListener(RandomStringUtils.randomAlphanumeric(3).toLowerCase()));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    Thread.sleep(500);
}

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
  String key = Integer.toString(i+1);
  String value = RandomStringUtils.randomAlphabetic(100);
  LOGGER.info("Sending message {}", key);
    producer.send(new ProducerRecord<String, String>("my-topic", key, value));
    Thread.sleep(100);
}
producer.close();    

Producer and consumer are the separate blocks of code that I start independently.

I have observer, that the following code works properly in the sequence:

however, in the sequence:

The messages from the first run of the producer are lost. Later, if I stop consumer, run producer and run consumer, I'm getting all messages. Only the messages produced before the first consumer has subscriben are lost. Although I've explicitely created the topic in the command line.

What I'm doing wrong here? How to prevent messages getting lost?

Upvotes: 0

Views: 708

Answers (1)

Michael Heil
Michael Heil

Reputation: 18485

By default, the consumer will read from the latest offset.

If you run the "producer (1)" and after that start the consumer, it will ignore the messages from that producer and wait for new messages produced by the call of the second producer.

The behaviour to read from latest offset can be changed through the configuration auto.offset.reset.

Later, if I stop consumer, run producer and run consumer, I'm getting all messages

This happens because your consumer has a fixed ConsumerGroup (configuration group.id) and the default setting auto.offset.reset does not have any impact anymore as this Group is registered with Kafka and the consumer will continue to read from the topic where it left off.

To conclude, if you want to not miss any messages when running your second sequence, set auto.offset.reset=earliest and define a new unique group.id.

Upvotes: 0

Related Questions