Maverick
Maverick

Reputation: 748

How to implement ConsumerSeekAware in Spring-kafka?

I am trying to implement a consumer with @KafkaListener. I am using Spring 2.3.7 version.

Here is my code so far,

public class SampleListener {

@KafkaListener(topics = "test-topic",
        containerFactory = "sampleKafkaListenerContainerFactory",
        groupId = "test-group")
public void onMessage(@Payload String message,
                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                              @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                              @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long receivedTimestamp,
                              @Header(KafkaHeaders.OFFSET) long offset,
                              @Headers MessageHeaders messageHeaders) {

    LOGGER.info("Received Message for topic={} partition={} offset={} messageHeaders={}",
            topic, partition, offset, messageHeaders);
    LOGGER.debug("Received Message payload={}", message);
    doSomething(message);

   }
}

I am new to Kafka and Spring. I read the spring-kafka docs on how to seek offsets but not able to understand completely.

My understanding, for my use case I don't want to read the events again when partitions are assigned to a container or in any other scenarios (Ensuring read only once).

I see most Consumer implementations implements ConsumerSeekAware. I know implementing ConsumerSeekAware gives us the ability to seek offset on events like onIdleContainer or onPartitionsAssigned. I am not able to understand what are the scenarios being handled with these?

  1. What are the scenarios ConsumerSeekAware is implemented to handle? What are the best practices or general scenarios in implementing Kafka Consumer which require seeking offset?

  2. What is the difference between registerSeekCallback and onPartitionsAssigned ? For both it says they are called whenever partitions are assigned. What is the difference between callBack for both these methods?

Upvotes: 3

Views: 8489

Answers (1)

Gary Russell
Gary Russell

Reputation: 174759

Implementing ConsumerSeekAware allows you to

a. Seek to a specific offset (or beginning, end, or an offset represented by a timestamp during initialization.

b. Peform seeks at any time during the lifecycle of the application.

The preferred technique is to extend AbstractConsumerSeekAware if possible because it takes care of much of the underlying complexity.

If you don't need to seek, then you don't need to implement the interface (or extend the abstract class).

My understanding, for my use case I don't want to read the events again when partitions are assigned to a container or in any other scenarios (Ensuring read only once).

The container will automatically commit the offsets for you (by default when all the records returned by a poll(), but you can set the container AckMode property to RECORD to commit the offset after each record is processed).

The next time you start the application, it will start consuming from the last committed offset(s).

2.

onPartitionsAssigned is called when partitions are assigned (initially or after a rebalance). If you perform seeks there, they invoke the consumer directly, during the rebalance.

registerSeekCallback is called to give the application a handle to a callback that can be invoked at any arbitrary time in the future. If the container has concurrency > 1 then multiple callbacks are registered. When you perform seeks on these callbacks, they are queued for the consumer thread to invoke just before the next poll. (The consumer is not thread-safe). The abstract class manages this for you and allows a higher level of abstraction...

/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
    getSeekCallbacks()
        .forEach((tp, callback) ->
            callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}

/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
    getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
        .seekRelative(topic, partition, -1, true);
}

In the upcoming 2.6.0 release (due this week), it's even easier, with methods seekToBeginning(), seekToEnd() and seekToTimeStamp() which will queue seeks for all assigned partitions.

Upvotes: 5

Related Questions