Reputation: 557
I'm trying to figure out how to create a working Kafka producer against an AWS MSK cluster. The cluster has TLS and SASL/SCRAM enabled. This is my first time using Kafka and the documentation on this is rather slim. The code that I currently have, which works against an unencrypted local cluster, is:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverBootstrap);
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
I have String typed variables username
and password
, which have been pulled from AWS Secrets Manager and contain the username and password for the cluster. What additional properties do I need to specify?
The docs by AWS and on the Kafka website are all about generating JAAS and truststore files on the local file system which would contain the secrets:
https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html
This is a non-starter for me. Basically, I need to create a generic Docker image that would be given as an environment variable the name of the secret in AWS Secrets Manager that will contain the username and password for the cluster.
I would assume this to be how people would authenticate to a cluster by default, but there are no instructions describing how to do this. The reason I'm not using the MSK IAM integration is that we will probably hit the 3000 connections per broker limit.
Upvotes: 6
Views: 8205
Reputation: 557
After lots of trial and error, this actually works against Amazon MSK:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverBootstrap);
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
if (useScram()) {
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("ssl.truststore.location", "/truststore/kafka.client.truststore.jks");
props.put("sasl.jaas.config", getScramAuthString());
}
producer = new KafkaProducer<>(props);
The getScramAuthString()
function is defined as follows:
private static String getScramAuthString() {
return String.format("org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=\"%s\"\npassword=\"%s\";", username, password);
}
Where username
and password
are read from environment variables, which are passed in by ECS from the AWS Secrets Manager.
Upvotes: 5