NikeAGoGo
NikeAGoGo

Reputation: 65

Periodically polling consumer metrics with Reactor Kafka

We have a Spring Boot project using Reactor Kafka and a KafkaReceiver for consuming and we would like to collect and emit the underlying consumer metrics. It looks like we could leverage KafkaReceiver.doOnConsumer() with something like this:

receiver.doOnConsumer(Consumer::metrics)
                .flatMapIterable(Map::entrySet)
                .map(m -> Tuples.of(m.getKey(), m.getValue()))

If this is the best approach I'm not sure what the best way to do this periodically would be.

I notice there's also a version of the KafkaReceiver.create() factory method that takes a custom ConsumerFactory, maybe there's some way to use this to register the underlying Kafka consumer with Micrometer at creation time? I'm new to Spring Boot and relatively new to Kafka Reactor so I'm not totally sure.

Here's a snippet of my code so far for more context:

KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
    .receive()
    .groupBy(m -> m.receiverOffset().topicPartition())
    .flatMap(partitionFlux -> partitionFlux.publishOn(this.scheduler)
        .map(r -> processEvent(partitionFlux.key(), r))
        .concatMap(this::commit))
    .doOnCancel(this::close)
    .doOnError(error -> LOG.error("An error was encountered", error))
    .blockLast();

If taking the doOnConsumer() approach makes sense we could possibly hook into doOnNext() but then we'd be collecting and emitting metrics for every event, which is too much, would be better if we could stagger and batch.

Any suggestions or tips appreciated, thanks.

Upvotes: 3

Views: 730

Answers (1)

Daniel Santana
Daniel Santana

Reputation: 194

When processing Kafka messages with receive, you handle each ConsumerRecord individually. If you prefer to process messages in batches and obtain metrics, you have two options: use receiveAutoAck to auto-acknowledge records, or receiveBatch to manually acknowledge or commit them. This way, you can perform actions at both the batch level and for each individual record.

Example using receiveAutoAck:

ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(topic));
ObservationRegistry observationRegistry = receiverOptions.observationRegistry();
KafkaReceiver.create(options)
    .receiveAutoAck()
    .flatMap(recordFlux -> this.processBatchActions(recordFlux, observationRegistry))
    .concatMap(record -> this.processMessageActions(record, observationRegistry))
    .doOnCancel(this::close)
    .doOnError(error -> LOG.error("An error was encountered", error))
    .subscribe();  // To subscribe messages and not block reactive flux

Methods to process batches and metrics:

private Flux<ConsumerRecord<String, String>> processBatchActions(Flux<ConsumerRecord<String, String>> recordFlux, ObservationRegistry observationRegistry) {
    Observation batchObservation = Observation.createNotStarted("kafka.batch.receiver", observationRegistry)
        .contextualName("NameYourBatchReceiverObservation")
        .lowCardinalityKeyValue("bootstrap.servers", receiverOptions.bootstrapServers())
        .start();

    // Do your things for each batch
    return recordFlux
        .doOnComplete(batchObservation::stop)
        .doOnError(error -> {
          batchObservation.error(error);
          batchObservation.stop();
        })
        .doOnNext(record -> {
          batchObservation.event(() -> "Processing record: " + record.key());
        });
}

// If you decide get metrics for each record
private Mono<Void> processMessageActions(ConsumerRecord<String, String> record, ObservationRegistry observationRegistry) {
    Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION
        .start(null, KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE, () ->
                new KafkaRecordReceiverContext(record, "user.receiver", receiverOptions.bootstrapServers()), observationRegistry);
    // Do your things for each message
}

Example using receiveBatch:

ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(topic));
    ObservationRegistry observationRegistry = receiverOptions.observationRegistry();
    KafkaReceiver.create(options)
        .receiveBatch()
        .flatMap(recordFlux -> this.processBatchActions(recordFlux, observationRegistry))
        .concatMap(record -> this.processMessageActions(record, observationRegistry))
        .doOnCancel(this::close)
        .doOnError(error -> LOG.error("An error was encountered", error))
        .subscribe();
private Flux<ReceiverRecord<String, String>> processBatchActions(Flux<ReceiverRecord<String, String>> recordFlux, ObservationRegistry observationRegistry) {
    Observation batchObservation = Observation.createNotStarted("kafka.batch.receiver", observationRegistry)
        .contextualName("NameYourBatchReceiverObservation")
        .lowCardinalityKeyValue("bootstrap.servers", receiverOptions.bootstrapServers())
        .start();

    // Do your things for each batch
    return recordFlux
        .doOnComplete(batchObservation::stop)
        .doOnError(error -> {
          batchObservation.error(error);
          batchObservation.stop();
        })
        .doOnNext(record -> {
          batchObservation.event(() -> "Processing record: " + record.key());
        });
}

// If you decide get metrics for each record
private Mono<Void> processMessageActions(ReceiverRecord<String, String> record, ObservationRegistry observationRegistry) {
    Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION
        .start(null, KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE, () ->
            new KafkaRecordReceiverContext(record, "user.receiver", receiverOptions.bootstrapServers()), observationRegistry);
    record.receiverOffset().acknowledge();
    return record.receiverOffset().commit(); //acknowledge or commit if necessary.
    // Do your things for each message
}

When using receiveAutoAck, offset commits occur automatically after batch processing is complete, or according to the configured commitInterval and commitBatchSize. In contrast, with the receiveBatch approach, you have control over offset commits by manually acknowledging each record, and the commitInterval and commitBatchSize settings will still apply, or you can commit offsets manually as needed.

For more information, refer to the official documentation: https://projectreactor.io/docs/kafka/snapshot/reference/

Upvotes: 0

Related Questions