Reputation: 12035
I am trying to use the Confluent Cloud hosted on Azure in my project and we do have API Key and Secret, but I am not able to connect and unable to read the message from kafka topic. I went thriugh the link: https://docs.confluent.io/cloud/current/access-management/authenticate/api-keys/best-practices-api-keys.html, but its not clear how to set it into Java consumer code and also getting below error
Exception in thread "main" org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed
I used below code
public class HelloConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXXXXXXXXX.azure.confluent.cloud:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "someid");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + "apiKeyValue" + "\" password=\"" + "secreteValue" + "\";");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("test-topic"));
while(true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records){
System.out.println("Key: " + record.key() + ", Value: " + record.value());
System.out.println("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
}
Upvotes: 0
Views: 321
Reputation: 71
Without doing any testing with your code, this looks like a variable reference issue? You are joining strings for username and password in the props.put(SaslConfigs.SASL_JAAS_CONFIG, ...
.
username=\"" + "apiKeyValue" + "\" password=\"" + "secreteValue" + "\";");
should likely be
username=\"" + apiKeyValue + "\" password=\"" + secreteValue + "\";");
It could be something else after this gets fixed, but I'd start there.
Upvotes: 0