Reputation: 2895
I'm using Spring Boot 2.2.0.M4 and Kafka 2.2.0 trying to build an application based on the sample at https://www.baeldung.com/spring-kafka. When I enable the listener for my topic, I get the following error on the consumer.
[AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
The following is defined in my application properties.
kafka.bootstrapAddress=172.22.22.55:9092
Here's the @KafkaListener annotated method.
@KafkaListener(topics = "add_app", groupId = "foo")
public void listen(String message) {
System.out.println("Received Message in group foo: " + message);
}
Below is the Consumer configuration class that is referencing the kafka.bootstrapAddress value. It is logged properly.
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
log.info("Created {} using address {}.", this.getClass(), bootstrapAddress);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("foo"));
return factory;
}
Upvotes: 6
Views: 9119
Reputation: 175
In order to use your custom property kafka.bootstrapAddress
you need to create @Bean KafkaAdmin
. It has its own configuration class AdminClientConfig
which is by default configured to connect to 127.0.0.1:9092
. To override the configuration you have to use something like this:
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
Upvotes: 0
Reputation: 2895
The solution to this is fairly simple. I just needed to add the following to the application.properties file.
spring.kafka.bootstrap-servers=174.22.22.55:9092
After looking at KafkaProperties.java, I found this line:
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
and this method actually builds them:
private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap();
if (this.bootstrapServers != null) {
properties.put("bootstrap.servers", this.bootstrapServers);
}
if (this.clientId != null) {
properties.put("client.id", this.clientId);
}
properties.putAll(this.ssl.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
}
Since it's already predefined on the class, the broker initially defined on the KafkaConsumerConfig is not used.
Update
Adding the containerFactory attribute to the listener annotation also fixes it and removes the need for the change to application.properties.
@KafkaListener(topics = "add_app", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listen(String message) {
System.out.println("Received Message in group foo: " + message);
}
Upvotes: 6