Reputation: 4719
We have a Java application which consumes Kafka messages, using org.apache.kafka.clients.consumer.KafkaConsumer
We have created a Spring Boot application with a Spring-Kafka dependency, but are unable to read the messages within the new project. Have checked the obvious parameters, including hostname and port of the bootstrap servers (which the logs show are recognized), the group, the topic and that Spring Boot, like the original consumer, uses StringDeserializer
. Here is our configuration file:
spring:
kafka:
bootstrap-servers: hostname1:9092,hostname2:9092
consumer:
auto-offset-reset: earliest
group-id: our_group
enable-auto-commit: false
fetch-max-wait: 500
max-poll-records: 1
kafka:
topic:
boot: topic.name
and the receiver:
@Component
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.boot}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
latch.countDown();
}
}
Here is the code to start the Boot application:
@SpringBootApplication
public class EmsDemoUsingSpringBootApplication {
public static void main(String[] args) {
SpringApplication.run(EmsDemoUsingSpringBootApplication.class, args);
}
}
This exception is being caught:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Is there anything obvious I have overlooked? What is the best way to debug this?
Thanks
Upvotes: 0
Views: 13853
Reputation: 12728
I had this problem too and what happens was that I could not connect to the server. You can change log level in the application.properties
or application.yml
to see more details. The demon is in the log..
logging:
level:
root: WARN
org.springframework: INFO
org.apache.kafka: DEBUG
I am told that Kafka is not able to handle name lookup and from my experience, the host to connect should almost always be FQDN names(with domain name and all). In my case, I think I haven't set the domain in my virtual box and it is not possible to find my guest box, even we are in the same subnet and ping
works.
Also, I create another main class for Kafka part and it turns out wrong. It is not good practice and you should annotate the application main class with @EnableKafka
and just put the settings in the yml file, and they should be loaded. No need for another configuration class.
My consumer:
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics={"testtopic"})
public void listen(@Payload String message) {
log.info("Received message is {}", message);
}
}
My application:
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.kafka.annotation.EnableKafka;
@Slf4j
@SpringBootApplication(exclude = { SecurityAutoConfiguration.class })
@EnableKafka // <----- I only had to add this line
public class SomeApplication {
public static void main(String[] args) {
SpringApplication.run(SomeApplication.class, args);
log.info("Application launched. ");
}
}
My config yml:
logging:
level:
root: WARN
org.springframework: INFO
org.apache.kafka: DEBUG
spring:
kafka:
bootstrap-servers: <FQDN names here:9092>
consumer:
group-id: <unique-group-id>
enable-auto-commit: false # never ack messsage when it is received.
listener:
ack-mode: manual # I am responsible to ack the messages
And launch the application. That's all.
Upvotes: 4
Reputation: 1210
I hope you may have missed KafkaListenerContainerFactory
bean which needs to be specified in @Configuration
file
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(new HashMap<String,Object>((Map)consumerConfig)));
factory.setConcurrency(concurrentConsumerCount);
factory.setBatchListener(true);
return factory;
}
Also for consumer specify KafkaListenerContainerFactory
like @KafkaListener(topics = ("${kafka.topic.boot}"), containerFactory = "kafkaManualAckListenerContainerFactory"
Upvotes: 1