Reputation: 405
I have built the following kafka consumer:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
this.kconsumer = new KafkaConsumer(props);
I want to the consumer to start with the earliest for this group when it is initiated. So the first time I run it, it works perfectly as expected. As long as the subscription exists and the connection is not closed it continues to increase the offset.
When I log in to kafka and run the following:
./kafka-consumer-groups.sh --bootstrap-server localhost:6667 --new-consumer --group TEST1 --describe
I see exactly what is expected, an increase in offset, etc. When the connection is closed however running the same command results in "Consumer group TEST1
does not exist or is rebalancing." Only it is not rebalancing, it is gone.
How do I persist the existence of the group when the consumer is not running? Am I missing a config in the consumer or in kafka?
As another note, when I alter the OFFSET parameter to "latest" I get no records at all unless new ones are loaded even though the records are not expired.
So bottom line, what I want to be able to do is spin up a new consumer with a given name, be able to pull from the earliest available record, shut down that consumer and if I start a consumer with that name again pull from where I left off. Any ideas of what I am missing? Or am I just misunderstanding how the high level consumer is meant to work at all?
Upvotes: 0
Views: 1157
Reputation: 405
In case someone comes across this and wants to know what I did. I was able to set the offset after determining if the group existed first. Doing it this way means if the group exists use "latest". If not, use "earliest".
private void buildConsumer(String offset)
{
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667");
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
this.kconsumer = new KafkaConsumer(props);
}
/*
Check if the group exists before polling.
If it does, leave with default offset.
If it does not exists, set the offset to earliest to ensure you are getting all the records
*/
private void groupExists(String topic)
{
TopicPartition toc = new TopicPartition(topic, 0);
OffsetAndMetadata oam = kconsumer.committed(toc);
if(oam != null){
//do nothing, all is well, start from last commit
} else {
/*
when a new group is started the AUTO_OFFSET_RESET_CONFIG
needs to be set to earliest to ensure all records are picked up
Since that property can only be set at instantiation the consumer
must be rebuilt and resubscribed
*/
buildConsumer("earliest");
this.kconsumer.subscribe(Arrays.asList(topic));
}
}
Upvotes: 1