sr1977
sr1977

Reputation: 25

Kafka Streams K-Table size monitoring

I have a stream topology which consumes from a topic and runs an aggregation and builds a KTable which is materialized into rocksDB.

I have another application that consumes all events from that same topic daily, and sends tombstone messages for events that meet some specific criteria (i.e. they are no longer needed). The aggregation deals with this and deletes from the state stores, but I'm looking at monitoring either the size of the state store or the change log topic - anything really that tells me the size of the ktable.

I have exposed the JMX metrics, but there is nothing there that appears to give me what I need. I can see the total number of "puts" into rocksDB, but not the total number of keys. My apps are spring boot and I would like to expose the metrics via prometheus.

Has anyone solved this issue or any ideas that would help?

Upvotes: 2

Views: 1343

Answers (2)

krizajb
krizajb

Reputation: 1814

If you have JMX metrics exposed, you can get many kafka metrics, the one that you are looking for is kafka_stream_state_estimate_num_keys.

Upvotes: 1

Tuyen Luong
Tuyen Luong

Reputation: 1366

You can get the approximate count in each partition by access to the underlying state store of the KTable using this KeyValueStore#approximateNumEntries() and then export this count to prometheus (each partition has one count).

To access to the underling state store you can using the low level processor API to get access to a KeyValueStore through each ProcessorContext in each StreamTask (correspond to a partition). Just add a KStream#transformValues() to your Topology:

kStream
        ...
        .transformValues(ExtractCountTransformer::new, "your_ktable_name")
        ...

And in ExtractCountTransformer extract the count to prometheus:

@Log4j2
public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {

    private KeyValueStore<String, String> yourKTableKvStore;
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
    }

    @Override
    public String transform(String readOnlyKey, String value) {
        //extract count to prometheus
        log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
        yourKTableKvStore.approximateNumEntries();
        return value;
    }

    @Override
    public void close() {

    }
}

Upvotes: 2

Related Questions