Billy Bob Bain
Billy Bob Bain

Reputation: 2895

Kafka consumer can't connect to broker other than localhost:9092 using Spring Boot 2.2.0.M4

I'm using Spring Boot 2.2.0.M4 and Kafka 2.2.0 trying to build an application based on the sample at https://www.baeldung.com/spring-kafka. When I enable the listener for my topic, I get the following error on the consumer.

[AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

The following is defined in my application properties.

kafka.bootstrapAddress=172.22.22.55:9092

Here's the @KafkaListener annotated method.

@KafkaListener(topics = "add_app", groupId = "foo")
public void listen(String message) {
    System.out.println("Received Message in group foo: " + message);
}

Below is the Consumer configuration class that is referencing the kafka.bootstrapAddress value. It is logged properly.

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        log.info("Created {} using address {}.", this.getClass(), bootstrapAddress);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("foo"));
        return factory;
    }

Upvotes: 6

Views: 9119

Answers (2)

macieg_b
macieg_b

Reputation: 175

In order to use your custom property kafka.bootstrapAddress you need to create @Bean KafkaAdmin. It has its own configuration class AdminClientConfig which is by default configured to connect to 127.0.0.1:9092. To override the configuration you have to use something like this:

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    return new KafkaAdmin(configs);
}

Upvotes: 0

Billy Bob Bain
Billy Bob Bain

Reputation: 2895

The solution to this is fairly simple. I just needed to add the following to the application.properties file.

spring.kafka.bootstrap-servers=174.22.22.55:9092

After looking at KafkaProperties.java, I found this line:

private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));

and this method actually builds them:

private Map<String, Object> buildCommonProperties() {
        Map<String, Object> properties = new HashMap();
        if (this.bootstrapServers != null) {
            properties.put("bootstrap.servers", this.bootstrapServers);
        }

        if (this.clientId != null) {
            properties.put("client.id", this.clientId);
        }

        properties.putAll(this.ssl.buildProperties());
        if (!CollectionUtils.isEmpty(this.properties)) {
            properties.putAll(this.properties);
        }

        return properties;
    }

Since it's already predefined on the class, the broker initially defined on the KafkaConsumerConfig is not used.

Update

Adding the containerFactory attribute to the listener annotation also fixes it and removes the need for the change to application.properties.

@KafkaListener(topics = "add_app", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listen(String message) {
    System.out.println("Received Message in group foo: " + message);
}

Upvotes: 6

Related Questions