Reputation: 13
I can't for-loop read kafka per partition, I don't know what wrong in my code, it don't show the value that I print
example: i want to firsttime read all offset partition 0 and second read all offset partition 1.
(Firsttime for me post in stackoverflow. Sorry for commucation, I hope you understand me.)
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topicNames);
List<KafkaTopicDataResponse> results = new ArrayList<>();
try {
Set<TopicPartition> assignments = consumer.assignment();
Object[] assignArray = assignments.toArray();
for (Object topicPartition : assignArray){
boolean flag = true;
int receiveRow = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (flag) {
consumer.seek((TopicPartition) topicPartition,0);
flag = false;
}
for (ConsumerRecord<String, String> record : records) {
receiveRow++;
System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n", record.offset(), record.partition(), record.key(), record.value());
logger.info("count : "+ receiveRow);
}
if (receiveRow >= limitRow){
break;
}
}
}
} catch(Exception e) {
logger.error("Exception occured while consuing messages",e);
}finally {
consumer.close();
}
Upvotes: 1
Views: 2290
Reputation: 9347
I want to firsttime read all offset partition 0 and second read all offset partition 1.
You said, you want to read all offsets from partition 0 and then partition 1. How do you say all when the data is flowing i.e. new data keeps on arriving?
So when you say all it should be all data at a given instant of time. For this, you would need to assign the partitions one by one, get the end offsets at that instant of time and read them till that end offset.
consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
and then you need to fetch the end offsets, because if data keeps on coming to this partition, this partition would never complete.
TopicPartition tp = new TopicPartition("topic", 0);
long endOffset = consumer.endOffsets(Collections.singletonList(tp)).get(tp);
Now, you have to call poll()
and check if the last record is >= endOffset-1
flag = true;
while(flag) {
ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord record: records) {
// process them
if(record.offset() >= (endOffset-1)) {
flag = false;
break;
}
}
}
Repeat the same steps with other partitions also.
Upvotes: 1