Vandeperre Maarten
Vandeperre Maarten

Reputation: 576

Kafka - Zookeeper - ACL configuration

context

I'm trying to setup a distributed logging system based upon kafka (I know there is stuff existing like logstash ... ) but I would like to be able to put some storm topology afterwards to, for example, send notifications when a flow is getting slower.

setup

I have a running server (wilfly swarm, keycloack authenticated) on port 8082 which is hosting my log functionality. I can push loglines through REST to this server. Behind the scenes a kafka producer is running and propagating the messages to kafka.

my server.properties (for the broker):

listeners=PLAINTEXT://localhost:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:Bob;User:Alice;User:anonymous

my acl configuration:

call kafka\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic
call kafka\bin\windows\kafka-acls.bat --add --allow-principal User:anonymous --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181  --allow-host http://localhost:8082 --operation Read --operation Write --topic testtopic
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --consumer --topic testtopic --group group --allow-host http://localhost:8082
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --producer --topic testtopic --allow-host http://localhost:8082
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --producer --topic testtopic --allow-host 192.168.3.63

my (java) producer properties:

    @Produces
    private Producer<String, String> stringStringProducer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<>(props);
            return producer;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

Problem

When I try to produce a message through the Java producer (and the console producer) I get:

[org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | producer-6) Error while fetching metadata with correlation id 10 : {testtopic=UNKNOWN_TOPIC_OR_PARTITION}

Does anyone know what I'm doing wrong?

First solution

I managed to get over this error message by granting access to 127.0.0.1:

call kafka\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic

call kafka\bin\windows\kafka-acls.bat --add --allow-principal User:anonymous --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181  --allow-host http://localhost:8082 --operation Read --operation Write --topic testtopic
call kafka\bin\windows\kafka-acls.bat --add --allow-principal User:ANONYMOUS --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181  --allow-host 127.0.0.1 --operation Read --operation Write --topic testtopic

call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --consumer --topic testtopic --group group --allow-host http://localhost:8082
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ANONYMOUS --consumer --topic testtopic --group group --allow-host 127.0.0.1

call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:anonymous --producer --topic testtopic --allow-host http://localhost:8082
call kafka\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ANONYMOUS --producer --topic testtopic --allow-host 127.0.0.1

I found the issue by looking at the logfiles (i.e. go to log4j.properties in the kafka folder and change the log4j.logger.kafka.authorizer.logger property to DEBUG. Then you'll get the concrete error (i.e. missing permissions).

new problem

When I want to produce a message, I now get:

[2017-03-28 15:39:07,704] WARN Error while fetching metadata with correlation id 0 : {testtopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-03-28 15:39:07,800] WARN Error while fetching metadata with correlation id 1 : {testtopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-03-28 15:39:07,912] WARN Error while fetching metadata with correlation id 2 : {testtopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-03-28 15:39:08,024] WARN Error while fetching metadata with correlation id 3 : {testtopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Anyone knows how to fix this?

Solved

I added "ANONYMOUS" to the super users within the broker config (server.properties):

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:Bob;User:Alice;User:ANONYMOUS
#port = 9092
#advertised.host.name = localhost
#listeners=SASL_SSL://localhost:9092
#security.inter.broker.protocol=SASL_SSL
#sasl.mechanism.inter.broker.protocol=PLAIN
#sasl.enabled.mechanisms=PLAIN
host.name=127.0.0.1
advertised.host.name=localhost
advertised.port=9092

Upvotes: 3

Views: 6195

Answers (1)

atoMerz
atoMerz

Reputation: 7672

The problem occurs because you have enabled authorization in the following line:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

But the broker is run with User:ANONYMOUS, because of the following line:

listeners=PLAINTEXT://localhost:9092

That is, there's no way for the broker to authenticate itself. In my case (SSL authentication) I had to do the following:

  1. Enable inter broker security using security.inter.broker.protocol=SSL.
  2. Disable PLAINTEXT port of the broker(s) by setting listeners=SSL://broker1:9092 (Note that there is no PLAINTEXT://broker1:9091)
  3. Define ACL for the user defined in my SSL cert using kafka-acls.sh.
  4. Restart the broker.

P. S. The workaround in your answer is discouraged. You can read about its implications here.

Upvotes: 5

Related Questions