PAA
PAA

Reputation: 12035

Exception in thread "main" org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed

I am trying to use the Confluent Cloud hosted on Azure in my project and we do have API Key and Secret, but I am not able to connect and unable to read the message from kafka topic. I went thriugh the link: https://docs.confluent.io/cloud/current/access-management/authenticate/api-keys/best-practices-api-keys.html, but its not clear how to set it into Java consumer code and also getting below error

Exception in thread "main" org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed

I used below code

public class HelloConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXXXXXXXXX.azure.confluent.cloud:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "someid");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + "apiKeyValue" + "\" password=\"" + "secreteValue" + "\";");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(Arrays.asList("test-topic"));

        while(true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records){
                System.out.println("Key: " + record.key() + ", Value: " + record.value());
                System.out.println("Partition: " + record.partition() + ", Offset:" + record.offset());
            }
        }
    }

Upvotes: 0

Views: 321

Answers (1)

David VanGorder
David VanGorder

Reputation: 71

Without doing any testing with your code, this looks like a variable reference issue? You are joining strings for username and password in the props.put(SaslConfigs.SASL_JAAS_CONFIG, ....

username=\"" + "apiKeyValue" + "\" password=\"" + "secreteValue" + "\";");

should likely be

username=\"" + apiKeyValue + "\" password=\"" + secreteValue + "\";");

It could be something else after this gets fixed, but I'd start there.

Upvotes: 0

Related Questions