florins
florins

Reputation: 1655

Monitor the lag for the consumers that are assigned to partitions topic

I'm working with Kafka 0.9.1 new consumer API. The consumer is manually assigned to a partition. For this consumer I would like to see its progress (meaning the lag). Since I added the group id consumer-tutorial as property, I assumed that I can use the command

bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092

(as explained here http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client)

Unfortunately, my consumer group details is not shown using the above command. Therefore I cannot monitor the progress of my consumer (it's lag). How can I monitor the lag in the above described scenario (manually assigned partition)?

The code is:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

String topic = "my-topic";
TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToBeginning(topicPartition);
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
    consumer.commitSynch();
  }
} finally {
  consumer.close();
}

Upvotes: 1

Views: 8191

Answers (5)

Nafees Ahmed
Nafees Ahmed

Reputation: 17

You can use simple and powerful tool for lag monitoring called

prometheus-kafka-consumer-group-exporter

refer below url:

https://github.com/braedon/prometheus-kafka-consumer-group-exporter

After installation run below command to export Consumer matrix on your required port Prometheus Kafka Consumer Group Exporter

/usr/bin/python3 /usr/local/bin/prometheus-kafka-consumer-group-exporter -p PORT -b KAFKA_CLUSTER_IP_PORT

After running above command verify data on http url YOUR-SERVER-IP:PORT like 127.0.0.1:9208

Now you can use any JMX scraper for dashboard and alert system. I am using prometheus & grafana

This can be run on any shared server like [kafka broker, zookeeper server, prometheus server or any] because it has very low overhead on system resources.

Upvotes: 0

Stanley Ambrose
Stanley Ambrose

Reputation: 114

The problem with your code is directly related to the manual assignment of consumers to topic-partitions.

You specify a consumer group in the group.id property, however, the group ID is only used when you subscribe to a topic (or a set of topics) via the KafkaConsumer.subscribe() API. In your example, you are using the .assign() method, which manually attaches the client to the specified topic-partition pairs, without utilising the underlying consumer group primitives. It is for this reason you are unable to see the consumer lag. Tools such as Burrow will not work in this case, because they will query the offsets of the consumer group, which is not there.

There are two options available to you:

  1. Use the consumer group feature properly, using the subscribe() API. This is the dominant use case for Kafka. However, the seekToBeginning() will also not work in this case, as the offsets will be entirely managed by the consumer group.
  2. Drop the consumer group altogether and manage both partition assignments and offsets manually. This gives you the maximum possible flexibility but is a lot of work, and you might find yourself reinventing the wheel. Most people will not go down this path, unless the consumer group feature of Kafka does not suit your needs.

The choice will depend squarely on your use case. For conventional stream processing, #1 is the idiomatic approach. This is what Kafka was designed for. #2 implies that you know what you are doing and transfers all of the group management responsibility onto your application.

Note: Kafka does not have a "partial" mode where you do some of group management and Kafka does the rest. It's either all-in or none at all.

Upvotes: 1

Vladimir Nabokov
Vladimir Nabokov

Reputation: 1907

If you interested in JMX exposure of consumer group lag, here is the agent I wrote: https://github.com/peterkovgan/kafka9.offsets

You can run this agent on some Kafka node and expose offset lag statistics to external readers.

There are examples how you use this agent with Telegraf (https://influxdata.com/time-series-platform/telegraf/).

At the end (combining e.g. telegraf,influxdb and grafana) you can see nice graphs of offset lags for several consumer groups.

Upvotes: 1

Otis Gospodnetic
Otis Gospodnetic

Reputation: 276

Just in case you don't want to write code to get this info or run command-like tools/shell scripts ad-hoc, there are N tools that will capture Kafka metrics, including Consumer Lag. Off the top of my head: Burrow and SPM for Kafka do a good job. Here is a bit of background about Kafka offsets, consumer lag, and a few metrics derived from what Kafka exposes via JMX. HTH.

Upvotes: 5

Kamal Chandraprakash
Kamal Chandraprakash

Reputation: 2002

In the kafka-consumer-groups.sh command, your group name is incorrect --group consumer-tutorial not consumer-tutorial-group

Upvotes: 0

Related Questions