Reputation: 987
I have a stream in java built as (anonimized some variables and classes):
Map<String, Object> props = new HashMap<>();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka-broker:6667");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> events = builder.stream("my-topic");
events.foreach((key, value) -> {
CustomClass instance = new CustomClass(value);
for (AnotherCustomClass anotherInstance: someIterator) {
anotherInstance(instance);
}
});
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
for kafka 0.10.0.0
:
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '0.10.0.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.0'
My question is the following:
KafkaStreams streams.metrics
inside the foreach
loop? In order to read and / or print the proccesed messages throughputanotherInstance(instance)
has been evaluatedUpvotes: 1
Views: 2514
Reputation: 783
Kafka Streams exposes all metrics via JMX (Java Management Extensions). You can check those metrics by using JConsole or VisualVM. With those tools you can explore all metrics and graph them.
In order to check how many messages your application is processing have a look at that metric:
MBean: kafka.streams:type=stream-metrics,thread.client-id=[threadId]
Attribute: process-rate
It tells you average number of processed messages per second across all tasks.
Full list of Kafka Streams metrics can be found in official documentation.
Upvotes: 3