Reputation: 2281
I have created a kafka cluster on confluent Cloud but I am unable to connect to it. When I run producer, I get following error:
[Producer clientId=producer-1] Node -1 disconnected. 2023-06-05T06:09:20.826+05:30 INFO 25324 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Cancelled in-flight API_VERSIONS request with correlation id 189 due to node -1 being disconnected (elapsed time since creation: 253ms, elapsed time since send: 253ms, request timeout: 30000ms) 2023-06-05T06:09:20.827+05:30 WARN 25324 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker (id: -1 rack: null) disconnected
I tried created a new cluster but result is same. I am using spring boot to connect to cluster.
Here is the configuration:
spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.bootstrap.servers=broker-address-here:9092 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='api-key-here' password='api-secret-here'; spring.kafka.properties.security.protocol=SASL_SSL spring.kafka.properties.session.timeout.ms=45000
Here are Spring boot beans:
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.properties.bootstrap.servers}")
private String bootStrapServer;
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<String, String>(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer,
AdminClientConfig.RETRIES_CONFIG, 0,
ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Here is my controller which tries to send message to cluster:
@RestController
@RequestMapping("/produce")
public final class LogProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "logs";
// Publish messages using the GetMapping
@PostMapping("/logs/v1")
public String publishMessage()
{
// Sending the message
kafkaTemplate.send(TOPIC, "sample log message");
return "Published Successfully";
}
}
Any suggestions, what am I doing wrong?
Upvotes: 1
Views: 4057
Reputation: 2281
I figured out the problem.
Problem is spring boot refuses to pick this information from application.properties. I need to inject all the information in Producerfactory bean only then it works. I also tried removing Producerfactory bean altogether thinking there might be a conflict but getting rid of Producerfactory bean also does not help.
So here is my updated code for Producerfactory
@Bean
public ProducerFactory<String, String> producerFactory() {
//Defining these propeties in application.propeties only does not work. Need to create a Producerfactory bean and inject all these properties. Only then it works.
return new DefaultKafkaProducerFactory<String, String>(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer,
AdminClientConfig.RETRIES_CONFIG, 0,
ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol,
SaslConfigs.SASL_MECHANISM, saslMechanism,
SaslConfigs.SASL_JAAS_CONFIG, saslJAAS
));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
I have no idea why spring boot would do this.
Upvotes: 1