Reputation: 863
I know, Kafka is meant to be dealt with like an infinite stream of events, and getting the remaining messages count isn't a built-in functionality. but I have to somehow monitor how my consumer processes are doing and if I'm providing enough resources for them.
overall scenario is basic Kafka usage, a couple of producers on different servers insert into a topic, consumers in group_a
read messages, perform some AI stuff and insert into another topic for further processing.
rate of incoming messages is in no way constant or predictable, so I need to have a measure of checking if my consumers are lagging behind (let's say if group_a
has more than 1000 unread messages in my input topic).
considering I have full control over Kafka setup, as well as consumer and producer code, what are my options here ?
EDIT: if you need to implement this in python yourself, I posted my solution as an answer below
Upvotes: 3
Views: 5104
Reputation: 863
for anyone who needs to do this in python, this is what I ended up implementing using kafka-python
package.
from kafka.protocol.offset import OffsetRequest_v0, OffsetResetStrategy
from kafka import KafkaAdminClient
class KafkaMonitor:
def __init__(self, host) -> None:
self.host = host
self.connected = False
self.connect()
def connect(self):
try:
self.admin = KafkaAdminClient(bootstrap_servers = self.host)
self.connected = True
return True
except:
self.admin = None
self.connected = False
return False
def get_stats(self, target_topic, target_group):
stats = {"available":False, "has_data":False, "lag":{}}
if (not self.connected) and (not self.connect()): return stats
stats["available"] = True
try:
offsets = self.admin.list_consumer_group_offsets(group_id=target_group)
for (topic, partition), (offset, metadata) in offsets.items():
if topic==target_topic:
stats["lag"][partition] = {"offset":offset}
if stats["lag"]:
for broker in self.admin._client.cluster.brokers():
topic_partition_list = self.admin._client.cluster.partitions_for_broker(broker.nodeId)
if topic_partition_list is None: continue
partitions = [partition_i for topic_i, partition_i in topic_partition_list if topic_i==target_topic]
request = OffsetRequest_v0(replica_id=1, topics=[
(target_topic, [(partition_i, OffsetResetStrategy.LATEST, 1) for partition_i in partitions])])
future = self.admin._send_request_to_node(node_id=broker.nodeId, request=request)
self.admin._client.poll(future=future)
response = future.value
for topic, meta in response.topics:
if topic==target_topic:
for partition, erorr_code, end in meta:
stats["lag"][partition]["end"] = end[0]
stats["lag"][partition]["lag"] = stats["lag"][partition]["end"] - stats["lag"][partition]["offset"]
stats["has_data"] = True
except Exception as e:
logger.error(e)
self.connected = False
return stats
Upvotes: 1
Reputation: 61
Kafka maintains metadata of the message offsets per partition consumed by each consumer. Partitioning enables you to have have multiple consumers for single topic.
Lag is the delta between consumer group offset and latest offset in the topic.
These metadatas are maintained by Kafka, and you do not need to maintain them in your app.
You can check consumer group offset and lag using for example CLI tool.
in the example below, foo group has consumed all messages from the topic "quickstart-events", so lag is 0. In this example there's just one partition and one consumer in the topic.
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group foo
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
foo quickstart-events 0 3 3 0 consumer-foo-1-0fbf8d40-732f-483d-91d3-9b6f686b5040 /127.0.0.1 consumer-foo-1
Same information is available also via the AdminClient API, see describe_consumer_groups.
If you need to automate reacting to the lag, it might be better to have separate monitoring process for the lag instead of trying to detect the lag inside the consumer processes. One tool for such approach is the Kafka Lag Exporter. Benefit of this approach is that you can do alerts and dashboards using generic monitoring tools, but of course there's bit work required to setup the required infrastructure.
It's important to have sufficient number of partitions for each topic, since max. number of concurrent consumers for a topic is determined by the number of partitions.
Upvotes: 3