Reputation: 31
I have currently developed a code that would display the topic, partition, and log offset. But I am currently stuck on how to get the Lag of a partition. I know that there is a kafka offset command that does this function but what I need is a java code.
public static void main(String[] args) throws Exception {
System.out.println("START CONSUMER");final Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
int i = 0;
ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
{
TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
partitions.add(partitiontemp);
}
consumer.assign(partitions);
consumer.seekToEnd(partitions);
for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
{
System.out.printf("Topic: %s partitionID: %d log offset: %d \n", TOPIC, i, consumer.position(partitions.get(i)));
}
System.out.printf("CREATE CONSUMER DONE");
consumer.close();
What I need to do is to output the topic, partition, current offset, log offset, and lag. How can I get the lag for my code or how can I get the current offset for my code. (see image for needed output).
NOTE: I cannot use the (foreach record) functionality because I must not read each record in the input file.
Upvotes: 2
Views: 5069
Reputation: 1
private static long calculateTopicLag(Properties properties, String topic, String consumerGroup) {
try (AdminClient adminClient = AdminClient.create(properties)) {
ListConsumerGroupOffsetsResult groupOffsetsResult = adminClient.listConsumerGroupOffsets(consumerGroup);
Map<TopicPartition, OffsetAndMetadata> offsets = groupOffsetsResult.partitionsToOffsetAndMetadata().get();
long topicLag = 0;
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
long consumerOffset = entry.getValue().offset();
KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(properties);
consumer.assign(Collections.singletonList(topicPartition));
long endOffset = consumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
consumer.close();
long partitionLag = endOffset - consumerOffset;
topicLag += partitionLag;
}
return topicLag;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return -1;
}
}
Upvotes: 0
Reputation: 563
You can have LAG by getting EndOffset from consumer
Set<TopicPartition> partitionSet = consumer.assignment();
Map<TopicPartition, Long> endOffsets =consumer.endOffsets(consumer.assignment());
then iterate where over set
for(TopicPartition tp : partitionSet) { LOG.info("Topic :: {} ,EndOffset :: {}, currentOffset {}",tp.topic(),beginningOffsets.get(tp),endOffsets.get(tp), consumer.position(tp)); }
consumer.position(tp) -- will get you current offset, subtract this from endoffset , you get LAG
Upvotes: 1
Reputation: 26885
To reproduce the kafka-consumer-groups
functionality, you need both a Consumer and an AdminClient instance.
First, use the AdminClient, you can call listConsumerGroupOffsets()
to retrieve the list of topic-partitions and the committed offsets for a specific group.
Then use a Consumer to get the End offsets for these partitions. The method you used is inefficient, there's no need to assign and seek to the end offset. You can simply call endOffsets()
.
That is enough for reproducing the data contained in your screenshot.
kafka-consumer-groups
also uses AdminClient.describeConsumerGroups()
to print the group member assigned (if any) to each partition.
Upvotes: 6