Swati Gupta
Swati Gupta

Reputation: 73

Spring Boot with Apache Kafka: Messages not being read

I am currently setting up a Spring Boot application with Kafka listener. I am trying to code only the consumer. For producer, I am manually sending message from the Kafka console for now. I followed the example: http://www.source4code.info/2016/09/spring-kafka-consumer-producer-example.html

I tried running this as a Spring Boot application but not able to see any messages being received. There are already some messages in my local topic of Kafka.

C:\software\kafka_2.11-0.10.1.0\kafka_2.11-0.10.1.0\kafka_2.11-0.10.1.0\bin\wind
ows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
this is a message
testing again

My Spring Boot application is:

@EnableDiscoveryClient
@SpringBootApplication
public class KafkaApplication {

    /**
     * Run the application using Spring Boot and an embedded servlet engine.
     * 
     * @param args
     *            Program arguments - ignored.
     */
    public static void main(String[] args) {
        // Tell server to look for registration.properties or registration.yml
        System.setProperty("spring.config.name", "kafka-server");

        SpringApplication.run(KafkaApplication.class, args);
    }
}

And Kafka configuration is:

package kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        //factory.setConcurrency(1);
        //factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        //propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return propsMap;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }
}

And Kafka listener is:

package kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;

public class Listener {

    protected Logger logger = Logger.getLogger(Listener.class
            .getName());

    public CountDownLatch getCountDownLatch1() {
        return countDownLatch1;
    }

    private CountDownLatch countDownLatch1 = new CountDownLatch(1);

    @KafkaListener(topics = "test")
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("Received message: " + record);
        System.out.println("Received message: " + record);
        countDownLatch1.countDown();
    }
}

I am trying this for the first time. Please let me know if I am doing anything wrong. Any help will be greatly appreciated.

Upvotes: 4

Views: 14818

Answers (4)

ShareNCare
ShareNCare

Reputation: 1

The above suggestions are good. If you have followed all of them but it did not work, please check if lazy loading is set to false for your application.

The lazy loading is false by default. However if your application had explicit setting like the one below, spring.main.lazy-initialization=true

Please comment it or make it to false

Upvotes: -1

John Senga
John Senga

Reputation: 11

You will need to annotate your Listener class with either @Service or @Component so that Spring Boot can load the Kafka listener.

package kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;

@Component
public class Listener {

    protected Logger logger = Logger.getLogger(Listener.class
            .getName());

    public CountDownLatch getCountDownLatch1() {
        return countDownLatch1;
    }

    private CountDownLatch countDownLatch1 = new CountDownLatch(1);

    @KafkaListener(topics = "test")
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("Received message: " + record);
        System.out.println("Received message: " + record);
        countDownLatch1.countDown();
    }
}

Upvotes: 1

NangSaigon
NangSaigon

Reputation: 1263

Observed that you dit comment out the consumer group.id property.

 //propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

Let's see how is quoted in the Kafka official document:

A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.

Tried to uncomement that row and the consumer worked.

Upvotes: 1

Chin Huang
Chin Huang

Reputation: 13860

You did not set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG so the default is "latest". Set it to "earliest" so the consumer will receive messages already in the topic.

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG takes effect only if the consumer group does not already have an offset for a topic partition. If you already ran the consumer with the "latest" setting, then running the consumer again with a different setting does not change the offset. The consumer must use a different group so Kafka will assign offsets for that group.

Upvotes: 5

Related Questions