user1052610
user1052610

Reputation: 4719

Unable to consume Kafka messages within Spring Boot

We have a Java application which consumes Kafka messages, using org.apache.kafka.clients.consumer.KafkaConsumer

We have created a Spring Boot application with a Spring-Kafka dependency, but are unable to read the messages within the new project. Have checked the obvious parameters, including hostname and port of the bootstrap servers (which the logs show are recognized), the group, the topic and that Spring Boot, like the original consumer, uses StringDeserializer . Here is our configuration file:

spring:
  kafka:
    bootstrap-servers: hostname1:9092,hostname2:9092
    consumer:
      auto-offset-reset: earliest
      group-id: our_group
      enable-auto-commit: false
      fetch-max-wait: 500
      max-poll-records: 1

kafka:
  topic:
    boot: topic.name 

and the receiver:

@Component
public class Receiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {
        return latch;
    }

    @KafkaListener(topics = "${kafka.topic.boot}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        latch.countDown();
    }

}

Here is the code to start the Boot application:

@SpringBootApplication
public class EmsDemoUsingSpringBootApplication {

    public static void main(String[] args) {
        SpringApplication.run(EmsDemoUsingSpringBootApplication.class, args);
    }
}

This exception is being caught:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Is there anything obvious I have overlooked? What is the best way to debug this?

Thanks

Upvotes: 0

Views: 13853

Answers (2)

WesternGun
WesternGun

Reputation: 12728

I had this problem too and what happens was that I could not connect to the server. You can change log level in the application.properties or application.yml to see more details. The demon is in the log..

logging:
  level:
    root: WARN
    org.springframework: INFO
    org.apache.kafka: DEBUG

I am told that Kafka is not able to handle name lookup and from my experience, the host to connect should almost always be FQDN names(with domain name and all). In my case, I think I haven't set the domain in my virtual box and it is not possible to find my guest box, even we are in the same subnet and ping works.

Also, I create another main class for Kafka part and it turns out wrong. It is not good practice and you should annotate the application main class with @EnableKafka and just put the settings in the yml file, and they should be loaded. No need for another configuration class.

My consumer:

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics={"testtopic"})
    public void listen(@Payload String message) {
        log.info("Received message is {}", message);
    }
}

My application:

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.kafka.annotation.EnableKafka;

@Slf4j
@SpringBootApplication(exclude = { SecurityAutoConfiguration.class })
@EnableKafka    // <----- I only had to add this line
public class SomeApplication {

    public static void main(String[] args) {
        SpringApplication.run(SomeApplication.class, args);
        log.info("Application launched. ");
    }
}

My config yml:

logging:
  level:
    root: WARN
    org.springframework: INFO
    org.apache.kafka: DEBUG

spring:
  kafka:
    bootstrap-servers: <FQDN names here:9092>
    consumer:
      group-id: <unique-group-id>
      enable-auto-commit: false # never ack messsage when it is received.
    listener:
      ack-mode: manual # I am responsible to ack the messages

And launch the application. That's all.

Upvotes: 4

donm
donm

Reputation: 1210

I hope you may have missed KafkaListenerContainerFactory bean which needs to be specified in @Configuration file

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaManualAckListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(new HashMap<String,Object>((Map)consumerConfig)));
  factory.setConcurrency(concurrentConsumerCount);
  factory.setBatchListener(true);
  return factory;
}

Also for consumer specify KafkaListenerContainerFactory like
@KafkaListener(topics = ("${kafka.topic.boot}"), containerFactory = "kafkaManualAckListenerContainerFactory"

Upvotes: 1

Related Questions