Nitish
Nitish

Reputation: 21

How to seek a particular offset in kafka listener method?

I am trying to seek offset from a SQL database in my kafka listener method . I have used registerSeekCallback method in my code but this method gets invoked when we run the consumer (or container is started) . Let's say my consumer is running and last committed offset is 20 in MySql database. I manually change the last committed offset in Mysql database to 11 but my consumer will keep reading from 21 unless i restart my consumer(container restarted) . I am looking out for any option if i can override or seek offset in my listener method itself. Any help would be appreciated.

public class Listen implements ConsumerSeekAware 
{
 @Override
    public void registerSeekCallback(ConsumerSeekCallback callback)

    {
//      fetching offset from a database 
        Integer offset = offsetService.getOffset();
        callback.seek("topic-name",0,offset);

    }
 @KafkaListener(topics = "topic-name", groupId = "group")
  public void listen(ConsumerRecord record Acknowledgment acknowledgment) throws Exception 
  {
//    processing the record 

      acknowledgment.acknowledge();    //manually commiting the record
//    committing the offset to MySQL database
  }
}

Editing with new listener method :-

@KafkaListener(topics = "topic-name", groupId = "group")
  public void listen(ConsumerRecord record Acknowledgment acknowledgment, 
  @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer)) throws Exception {
       // seeking old offset stored in database (which is 11 )
        consumer.seek(partition,offsetService.getOffset());
        log.info("record offset is {} and value is {}" , record.offset(),record.value() );
        acknowledgment.acknowledge();
}

In database my last committed offset is 11 and last committed offset on kafka end is 21. When i wrote a new record in kafka topic(i.e on offset 22) , my consumer triggers and processes 22 offset first then it goes back to seek offset 11 & start processing from there. why is it consuming offset 22 first although i am seeking offset 11 ?

With my above code , every time i write a new message to my kafka top it processes that record first then it seeks the offset present in my database . Is there any way i can avoid that ?

Upvotes: 2

Views: 10018

Answers (2)

Sumit Sood
Sumit Sood

Reputation: 485

Starting with spring kafka version 2.5.5, we can apply an initial offset to all assigned partitions:

        @KafkaListener( groupId = "group_json", containerFactory = "userKafkaListenerFactory", topicPartitions =
                {@org.springframework.kafka.annotation.TopicPartition(topic = "Kafka_Topic", partitions = {"0"},
                        partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "3")),
                        @org.springframework.kafka.annotation.TopicPartition(topic = "Kafka_Topic_2", partitions = {"0"},
                                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "4"))
                })
        public void consumeJson(User user, ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) throws Exception {

            /*
            Reading the message into a String variable.
             */
            String message = consumerRecord.value().toString();
}

Source: https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#reference

Upvotes: -1

Gary Russell
Gary Russell

Reputation: 174554

There are several techniques in this answer.

Bear in mind that performing a seek on the consumer will not take effect until the next poll (any records fetched on the last poll will be sent to the consumer first).

EDIT

Here's an example:

@SpringBootApplication
public class So63429201Application {

    public static void main(String[] args) {
        SpringApplication.run(So63429201Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, Listener listener) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so63429201", i % 3,  null, "foo" + i));
            Thread.sleep(8000);
            listener.seekToTime(System.currentTimeMillis() - 11000);
            Thread.sleep(8000);
            listener.seekToOffset(new TopicPartition("so63429201", 0), 11);
            Thread.sleep(8000);
        };
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63429201").partitions(3).replicas(1).build();
    }

}

@Component
class Listener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "so63429201", topics = "so63429201", concurrency = "2")
    public void listen(String in) {
        System.out.println(in);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        System.out.println(assignments);
        super.onPartitionsAssigned(assignments, callback);
        callback.seekToBeginning(assignments.keySet());
    }

    public void seekToTime(long time) {
        getSeekCallbacks().forEach((tp, callback) -> callback.seekToTimestamp(tp.topic(), tp.partition(), time));
    }

    public void seekToOffset(TopicPartition tp, long offset) {
        getSeekCallbackFor(tp).seek(tp.topic(), tp.partition(), offset);
    }

}

Upvotes: 3

Related Questions