user3777801
user3777801

Reputation: 101

Spring Boot Job scheduler with Kafka consumer

I am working on a POC where I want to consume messages from a Kafka topic "users". Trying to achieve that consumer should read message from Kafka topic once spring boot scheduler triggers on the scheduled time or cron time then we should start consuming the existing messages one by one from the kafka topic and process those and when all the messages consumed then kafka consumer should stop. scheduler should trigger on the cron time and start the process again.

I have tried the below to achieve this though I am struggling to identify how to call consume(String message) method from my scheduler method schedularMsgConsumeKakfa and any example where we have better structure to consume messages from Kafka in the spring boot scheduler and write kafka consumer method in the class implementing tasklet interface etc. appreciating for any suggestions.

@Configuration
@Service
public class SchedularMsgConsumeKafkaController2 {

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @KafkaListener(topics = "users", id = "full-part-id", containerGroup = "full-part-group", autoStartup = "false")
    public void consume(String message) throws IOException, InterruptedException {
        System.out.println(String.format("#### -> Consumed message -> %s", message));
        System.out.println(String.format("Really happy"));
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
        listenerContainer.stop();
    }

    @Scheduled(fixedDelay = 30000, initialDelay = 15000)
    public void schedularMsgConsumeKakfa() throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
        listenerContainer.start();
    }

}

Upvotes: 4

Views: 8443

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

First of all @Scheduled is not a Spring Boot feature. It is native to Spring Framework: https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling.

Second: the @KafkaListener is a part of Spring for Apache Kafka project: https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation.

Therefore both features can be used outside of Spring Boot framework. I know this is not related to the question, but it would be better to call things with their proper names.

The mix of @KafkaListener & @Scheduled is not so good solution since you are mixing many not compatible concerns.

It is better to look to some different solution where you don't need to deal with start()/stop() and would not overhead the environment with extra threads.

Consider to look into Spring Integration and its KafkaMessageSource implementation: https://docs.spring.io/spring-integration/reference/html/kafka.html#kafka-inbound-pollable. It is probably something new for your to learn, but it worth that since you are not going to do too many stuff yourself.

With "yourself" I mean the usage of ConsumerFactory API and calling KafkaConsumer.poll() manually from your @Scheduled.

There is probably no such a sample for @Scheduled solution because it is really going to be sophisticated enough. And there is no sample for KafkaMessageSource in Spring Integration since it is pretty simple and lands only in the matter to configure an appropriate channel adapter.

Upvotes: 3

Related Questions