Bagira
Bagira

Reputation: 2281

Bootstrap broker disconnected: Not able to connect to kafka

I have created a kafka cluster on confluent Cloud but I am unable to connect to it. When I run producer, I get following error:

[Producer clientId=producer-1] Node -1 disconnected. 2023-06-05T06:09:20.826+05:30 INFO 25324 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Cancelled in-flight API_VERSIONS request with correlation id 189 due to node -1 being disconnected (elapsed time since creation: 253ms, elapsed time since send: 253ms, request timeout: 30000ms) 2023-06-05T06:09:20.827+05:30 WARN 25324 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker (id: -1 rack: null) disconnected

I tried created a new cluster but result is same. I am using spring boot to connect to cluster.

Here is the configuration:

spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.bootstrap.servers=broker-address-here:9092 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='api-key-here' password='api-secret-here'; spring.kafka.properties.security.protocol=SASL_SSL spring.kafka.properties.session.timeout.ms=45000

Here are Spring boot beans:

@Configuration
public class KafkaConfiguration {
  
    @Value("${spring.kafka.properties.bootstrap.servers}")
    private String bootStrapServer;

    @Bean
    public ProducerFactory<String, String> producerFactory() {

      return new DefaultKafkaProducerFactory<String, String>(Map.of(
              AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer,
              AdminClientConfig.RETRIES_CONFIG, 0,
              ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432,
              ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
              ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
      ));
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
    }
}

Here is my controller which tries to send message to cluster:

@RestController
@RequestMapping("/produce")
public final class LogProducer {

    @Autowired 
    private KafkaTemplate<String, String> kafkaTemplate;
 
    private static final String TOPIC = "logs";
 
    // Publish messages using the GetMapping
    @PostMapping("/logs/v1")
    public String publishMessage()
    {
 
        // Sending the message
        kafkaTemplate.send(TOPIC, "sample log message");
 
        return "Published Successfully";
    }    
}

Any suggestions, what am I doing wrong?

Upvotes: 1

Views: 4057

Answers (1)

Bagira
Bagira

Reputation: 2281

I figured out the problem.

Problem is spring boot refuses to pick this information from application.properties. I need to inject all the information in Producerfactory bean only then it works. I also tried removing Producerfactory bean altogether thinking there might be a conflict but getting rid of Producerfactory bean also does not help.

So here is my updated code for Producerfactory

@Bean
public ProducerFactory<String, String> producerFactory() {

  //Defining these propeties in application.propeties only does not work. Need to create a Producerfactory bean and inject all these properties. Only then it works.
  return new DefaultKafkaProducerFactory<String, String>(Map.of(
          AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer,
          AdminClientConfig.RETRIES_CONFIG, 0,
          ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432,
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol,
          SaslConfigs.SASL_MECHANISM, saslMechanism,
          SaslConfigs.SASL_JAAS_CONFIG, saslJAAS
  ));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

I have no idea why spring boot would do this.

Upvotes: 1

Related Questions