Reputation: 13
everyone. This is my first post here, so, please, pardon my finesse skill of writing stack overflow questions.
I am having trouble using AdminClient from org.apache.kafka.clients.admin.AdminClient.
The issue at hand is this: I initiate a secure connection to our broker server (running kafka 1.0.0) using SASL SSL.
it works just fine when I am running a consumer against that same broker with the same security settings. However when I am doing AdminClient stuff, it seems to have worked, but I see no traffic coming out of my machine to the broker server whatsoever in wireshark, and what I am trying to do does not happen on the broker side.
here is my code:
public class AclProvisioner {
//set up variables
private static Properties props = new Properties();
private static ClassLoader classloader = Thread.currentThread().getContextClassLoader();
static String mid = null;
static String topic = null;
public static void main(String... args) {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkabroker.mydomain.com:9094");
props.put("security.protocol","SASL_SSL");
props.put("ssl.truststore.location", "C:\\Temp\\mydomain.root.jks" );
props.put("ssl.truststore.password","my_truststore_password");
props.put("sasl.mechanism","GSSAPI");
props.put("sasl.kerberos.service.name","kafka_admin_username");
AdminClient adminClient = AdminClient.create(props);
// generate ACLs
AclBinding newTopicReadAcl = new AclBinding( new Resource(ResourceType.TOPIC, "TestTopic"),
new AccessControlEntry("MY_TESTID", "*", AclOperation.READ, AclPermissionType.ALLOW) );
AclBinding newTopicDescribeAcl = new AclBinding( new Resource(ResourceType.TOPIC, "TestTopic"),
new AccessControlEntry("MY_TESTID", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW) );
AclBinding newGroupReadAcl = new AclBinding( new Resource(ResourceType.GROUP, "TestGroup"),
new AccessControlEntry("MY_TESTID", "*", AclOperation.READ, AclPermissionType.ALLOW) );
Collection<AclBinding> aclList = Arrays.asList(newTopicReadAcl, newTopicDescribeAcl, newGroupReadAcl);
adminClient.createAcls(aclList);
// create topic
int numPartitions = 6;
short replicasFactor = 2;
NewTopic newTopic = new NewTopic("Demo.JavaAdminClientTest", numPartitions, replicasFactor);
Map<String, String> configMap = new HashMap<>();
configMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
configMap.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip");
newTopic.configs(configMap);
List<NewTopic> topics = Arrays.asList(newTopic);
adminClient.createTopics( topics );
}
If I ssh to the server itself and export my keytab and kinit, I am able to generate ACLs just fine using CLI method. I am also able to run a consumer using the same exact properties (as far as security goes).
Another thing I have discovered, is that if I put a server that does not exist or can not be reached, the program does fail, telling me that it could not resolve the BOOTSTRAP_SERVER_NAME.
same exact behavior happens if instead of ACL I attempt to create Topics. Once again, that does work just fine out of CLI.
I appreciate any pointers!
Cheers
Upvotes: 1
Views: 4995
Reputation: 26885
All AdminClient methods are asynchronous and only return Future objects.
So if you don't explicitly wait on the futures to complete, your program just terminates before the AdminClient has time to send anything over the network.
You can use all()
or values()
on the CreateAclsResult
[0] and CreateTopicsResults
[1] to retrieve KafkaFuture
[2] objects. Then use get()
on them to wait for example.
[0] http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/CreateAclsResult.html
[1] http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/CreateTopicsResult.html
[2] http://kafka.apache.org/11/javadoc/org/apache/kafka/common/KafkaFuture.html
Upvotes: 1