Reputation: 7809
Use Case:
I am using Spring Boot 2.2.5.RELEASE
and Kafka 2.4.1
JAAS/SASL configurations are done properly on Kafka/ZooKeeper as topics are created without issue with kafka-topics.bat
Issue:
When i start Spring Boot application, i immediately get the following errors:
kafka-server-start.bat console:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
IDE console:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected
My application.properties
configuration:
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";
kafka_server_jaas.conf
:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="12345"
user_admin="12345"
user_spring_bO0t="i_am_a_spring_bO0t_user";
};
Am i missing something?
Thanks in advance.
Upvotes: 14
Views: 53168
Reputation: 1396
If you use spring you can add 3 properties to your yaml file.
kafka:
bootstrap-servers: ${KAFKA_URL:localhost:9092}
properties:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="${KAFKA_USER:user}" password="${KAFKA_PASSWORD:pass}";
Upvotes: 9
Reputation: 7809
I defined the properties in the wrong place i.e in application.properties
. As i have ProducerFactory & ConsumerFactory beans, those application.properties
will be ignored by Spring Boot.
Configuring the same properties in the beans definitions resolved the issue, i.e move your properties from application.properties
to where you define your beans.
Here's an example:
@Bean
public ProducerFactory<Object, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=username" +
"password=password;");
return new KafkaAdmin(configs);
}
Upvotes: 23
Reputation: 101
The answer provided by @jumping_monkey is correct, however I didn't know where to put those configurations in ProducerFactory
& ConsumerFactory
beans, so I'll leave an example below for those who want to know:
-In your ProducerConfig
or ConsumerConfig
Beans respectively (Mine is named generalMessageProducerFactory
):
@Bean
public ProducerFactory<String, GeneralMessageDto> generalMessageProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put("sasl.mechanism", "PLAIN");
configProps.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configProps.put("security.protocol", "SASL_SSL");
return new DefaultKafkaProducerFactory<>(configProps);
}
And also in your TopicConfiguration
Class in kafkaAdmin
method:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configs.put("security.protocol", "SASL_SSL");
return new KafkaAdmin(configs);
}
Hope this was helpful guys!
Upvotes: 10