rysisnice
rysisnice

Reputation: 9

Counting Messages in Kafka Topics

I am trying to understand how to keep track of message ingestion with Kafka.

The workflow we follow right now is purging all the messages in the topics, and then we re-ingest with code changes. I need to know how successful those code changes are. In current state I am using Kafka Tool and manually refreshing the Total number of Messages, and keeping my results in a csv, which I know is not sustainable long term.

What are your recommendations on automating fetching the counts of messages in Kafka Topics? Ideally I would like to hit the topic on a minute by minute frequency and get the counts, as well as windows of time like 1 day etc.

*I cannot use KSQL because of stability issues we are having.

Upvotes: 0

Views: 10266

Answers (3)

armourbear
armourbear

Reputation: 616

If you have metrics enabled. This is how you do in prometheus

sum(kafka_topic_partition_current_offset{topic="mytopic"})- sum(kafka_topic_partition_oldest_offset{topic="mytopic"})

Upvotes: 0

spats
spats

Reputation: 853

https://stackoverflow.com/a/63191575/163585

To get number of messages in kafka:

brokers="<broker1:port>"
topic=<topic-name>
sum_1=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -1 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))

where option --time -1 => current max offset & --time -2 is current min offset.

Number of messages in the kafka as after retention period messages will be deleted from topic so offset != count of messages**

Upvotes: 0

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39830

Counts on messages is also dependent on the log compaction.


For example, you might observe "strange results" when log compaction is effective for one topic. Say you have a topic myTopic that has 100 messages in total. Assuming that you have a log compaction policy that takes effect now, the count might be reduced to say 20 messages because older messages have been compacted.


In order to get the message count per partition, you can use the following command:

kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic myTopic 

and the result would be something similar to the one below (assuming myTopic has 3 partitions):

myTopic:2:34 
myTopic:1:33 
myTopic:0:33

Alternatively, for overall sum you can use this one:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:2181 \
  --topic myTopic \
  --time -1 \
  --offsets 1 \
  | awk -F  ":" '{sum += $3} END {print sum}'

You might also find some Kafka Monitoring Tools useful in this context. More precisely, CMAK (aka kafka-manager) has one section in metrics about Summed Recent Offsets

Upvotes: 3

Related Questions