Reputation: 4176
I have a Kafka consumer in Golang. I don't want to consume from where I left last time, but rather current message. How can I do it?
Upvotes: 2
Views: 599
Reputation: 532
Apache kafka consumer api provides a method called kafkaConsumer.seekToEnd() which can be used to ignore the existing messages and only consume messages published after the consumer has been started without changing the current group ID of the consumer. Below is the implementation of the same. The program takes 3 arguments : topic name, group ID and offset range (0 to start from beginning, - 1 to receive messages after consumer has started, other than 0 or - 1 will imply to to consumer to consume from that offset)
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.util.*;
public class Consumer {
private static Scanner in;
public static void main(String[] argv)throws Exception{
if (argv.length != 3) {
System.err.printf("Usage: %s <topicName> <groupId> <startingOffset>\n",
Consumer.class.getSimpleName());
System.exit(-1);
}
in = new Scanner(System.in);
String topicName = argv[0];
String groupId = argv[1];
final long startingOffset = Long.parseLong(argv[2]);
ConsumerThread consumerThread = new ConsumerThread(topicName,groupId,startingOffset);
consumerThread.start();
String line = "";
while (!line.equals("exit")) {
line = in.next();
}
consumerThread.getKafkaConsumer().wakeup();
System.out.println("Stopping consumer .....");
consumerThread.join();
}
private static class ConsumerThread extends Thread{
private String topicName;
private String groupId;
private long startingOffset;
private KafkaConsumer<String,String> kafkaConsumer;
public ConsumerThread(String topicName, String groupId, long startingOffset){
this.topicName = topicName;
this.groupId = groupId;
this.startingOffset=startingOffset;
}
public void run() {
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset123");
configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//Figure out where to start processing messages from
kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
while(topicPartitionIterator.hasNext()){
TopicPartition topicPartition = topicPartitionIterator.next();
System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition) );
if(startingOffset == -2) {
System.out.println("Leaving it alone");
}else if(startingOffset ==0){
System.out.println("Setting offset to begining");
kafkaConsumer.seekToBeginning(topicPartition);
}else if(startingOffset == -1){
System.out.println("Setting it to the end ");
kafkaConsumer.seekToEnd(topicPartition);
}else {
System.out.println("Resetting offset to " + startingOffset);
kafkaConsumer.seek(topicPartition, startingOffset);
}
}
}
});
//Start processing messages
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
if(startingOffset == -2)
kafkaConsumer.commitSync();
}
}catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
}finally{
kafkaConsumer.close();
System.out.println("After closing KafkaConsumer");
}
}
public KafkaConsumer<String,String> getKafkaConsumer(){
return this.kafkaConsumer;
}
}
}
Upvotes: 0
Reputation: 862
You can set enable.auto.commit to false and auto.offset.reset to latest for your consumer group id. This means kafka will not be automatically committing your offsets.
With auto commit disabled, your consumer group progress would not be saved (unless you do manually). So whenever the consumer is restarted for whatever reason, it does not find its progress saved and resets to the latest offset.
Upvotes: 2
Reputation: 1771
set a new group.id
to your consumer.
Then use auto.offset.reset
to define the behavior of this new consumer group, in you case: latest
Upvotes: 0