Reputation: 9
I am trying to understand how to keep track of message ingestion with Kafka.
The workflow we follow right now is purging all the messages in the topics, and then we re-ingest with code changes. I need to know how successful those code changes are. In current state I am using Kafka Tool and manually refreshing the Total number of Messages, and keeping my results in a csv, which I know is not sustainable long term.
What are your recommendations on automating fetching the counts of messages in Kafka Topics? Ideally I would like to hit the topic on a minute by minute frequency and get the counts, as well as windows of time like 1 day etc.
*I cannot use KSQL because of stability issues we are having.
Upvotes: 0
Views: 10266
Reputation: 616
If you have metrics enabled. This is how you do in prometheus
sum(kafka_topic_partition_current_offset{topic="mytopic"})- sum(kafka_topic_partition_oldest_offset{topic="mytopic"})
Upvotes: 0
Reputation: 853
https://stackoverflow.com/a/63191575/163585
To get number of messages in kafka:
brokers="<broker1:port>"
topic=<topic-name>
sum_1=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -1 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}')
sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}')
echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))
where option --time -1
=> current max offset & --time -2
is current min offset.
Number of messages in the kafka as after retention period messages will be deleted from topic so offset != count of messages**
Upvotes: 0
Reputation: 39830
Counts on messages is also dependent on the log compaction.
For example, you might observe "strange results" when log compaction is effective for one topic.
Say you have a topic myTopic
that has 100 messages in total. Assuming that you have a log compaction policy that takes effect now, the count might be reduced to say 20 messages because older messages have been compacted.
In order to get the message count per partition, you can use the following command:
kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic myTopic
and the result would be something similar to the one below (assuming myTopic
has 3 partitions):
myTopic:2:34
myTopic:1:33
myTopic:0:33
Alternatively, for overall sum you can use this one:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:2181 \
--topic myTopic \
--time -1 \
--offsets 1 \
| awk -F ":" '{sum += $3} END {print sum}'
You might also find some Kafka Monitoring Tools useful in this context. More precisely, CMAK
(aka kafka-manager
) has one section in metrics about Summed Recent Offsets
Upvotes: 3