AngryBirdLearning
AngryBirdLearning

Reputation: 9

Kafka Listener : Poll interval : How to schedule kafka consumer poll() with 15 min interval

How can I schedule poll() interval for 15 min in Kafka listener?

My sample code for 5 min poll interval works fine but I have a requirement for schedule poll() interval with 15 min diff.

public class KafkaConsumerConfig {
    private final String SERVERS = "localhost:9092";

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 900000); 
                return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setIdleEventInterval(60000l); 
    return factory;
    }

 @KafkaListener(id = "batch-listener", topics = TOPIC_01, group =
       "test-consumer-group", 
              containerFactory = "kafkaListenerContainerFactory")//groupId = "test-consumer-group", 
         public void messageListener(List<String> messages) {
          log.info("Recieved {} messages", messages.size());
             messages.forEach(message ->
                     log.info("MessageListener recieved message : {}", message));   }

}

Upvotes: 0

Views: 7599

Answers (2)

AngryBirdLearning
AngryBirdLearning

Reputation: 9

Yes with props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000000); factory.getContainerProperties().setIdleBetweenPolls(900000);

Poll() of messages as a batch is working as expected.

thank you @Gary

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174484

Your requirement is not clear

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); 

change it to 900000 if you need that much time to process your 10 records.

If you mean you only want to fetch records every 15 minutes, use

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000000); 

and

factory.getContainerProperties().setIdleBetweenPolls(900000);

Upvotes: 2

Related Questions