Shafique Jamal
Shafique Jamal

Reputation: 1688

Kafka console producer cannot connect to the broker

Connecting to a Kafka broker using the console producer using the following command:

KAFA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true" \
bin/kafka-console-producer.sh \
--broker-list server-01.eigenroute.com:9092 
--topic test-topic \
--producer.config config/sasl-producer.properties

fails with this warning:

>test message
[2018-01-06 15:29:10,724] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-06 15:29:10,816] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

My Kafka broker seems to be functioning without problems:

KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-1.0.0/config/jaas.conf -Dsun.security.krb5.debug=true -Djava.security.krb5.conf=/etc/krb5.conf -Xmx256M -Xms128M" bin/kafka-server-start.sh config/server-sasl-brokers-zookeeper.properties
[2018-01-06 19:59:27,853] INFO KafkaConfig values:
        advertised.host.name = null
        advertised.listeners = SASL_PLAINTEXT://server-01.eigenroute.com:9092
...
zookeeper.connect = zookeeper-server-01.eigenroute.com:2181,zookeeper-server-02.eigenroute.com
:2181,zookeeper-server-03.eigenroute.com:2181/apps/kafka-cluster-demo
...
[2018-01-06 19:59:29,173] INFO zookeeper state changed (SaslAuthenticated) (org.I0Itec.zkclient.ZkClie
nt)
[2018-01-06 19:59:29,207] INFO Created zookeeper path /apps/kafka-cluster-demo (kafka.server.KafkaServer)
...
[2018-01-06 19:59:30,174] INFO zookeeper state changed (SaslAuthenticated) (org.I0Itec.zkclient.ZkClient)
[2018-01-06 19:59:30,389] INFO Cluster ID = TldZ-s6DQtWxpjl045dPlg (kafka.server.KafkaServer)
[2018-01-06 19:59:30,457] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
...
[2018-01-06 19:59:33,035] INFO Successfully authenticated client: authenticationID=kafka-broker-1-1/[email protected]; authorizationID=kafka-broker-1-1/[email protected]. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
[2018-01-06 19:59:33,082] INFO [ReplicaFetcherManager on broker 11] Removed fetcher for partitions test-topic-0 (kafka.server.ReplicaFetcherManager)
[2018-01-06 19:59:33,381] INFO Replica loaded for partition test-topic-0 with initial high watermark 0 (kafka.cluster.Replica)
[2018-01-06 19:59:33,385] INFO [Partition test-topic-0 broker=11] test-topic-0 starts at Leader Epoch 1 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2018-01-06 19:59:33,424] INFO [ReplicaFetcherManager on broker 11] Removed fetcher for partitions test-topic-0 (kafka.server.ReplicaFetcherManager)
[2018-01-06 19:59:33,424] INFO [Partition test-topic-0 broker=11] test-topic-0 starts at Leader Epoch 2 from offset 0. Previous Leader Epoch was: 1 (kafka.cluster.Partition)
[2018-01-06 20:09:31,261] INFO [GroupMetadataManager brokerId=11] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-01-06 20:19:31,261] INFO [GroupMetadataManager brokerId=11] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-01-06 20:29:31,261] INFO [GroupMetadataManager brokerId=11] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

Here is my producer config (config/sasl-producer.properties):

bootstrap.servers=server-01.eigenroute.com:9092
compression.type=none
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true \
        storeKey=true  \
        keyTab="/Users/shafiquejamal/allfiles/kerberos/producer1.whatever.keytab" \
        principal="producer1/[email protected]";

Here is my broker config (config/server-sasl-brokers-zookeeper.properties):

broker.id=11
listeners=SASL_PLAINTEXT://server-01.eigenroute.com:9092
advertised.listeners=SASL_PLAINTEXT://server-01.eigenroute.com:9092
# host.name=server-01.eigenroute.com
security.inter.broker.protocol=SASL_PLAINTEXT
# sasl.kerberos.service.name=kafka-broker-1-1/server-01.eigenroute.com
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/log/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper-server-01.eigenroute.com:2181,zookeeper-server-02.eigenroute.com:2181,zookeeper-server-03.eigenroute.com:2181/apps/kafka-cluster-demo
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

Note that I am using SASL authentication between the Kafka broker and ZooKeeper, and between the Kafka broker and Kafka clients (in this case, just one producer). Here are the contents of my Kafka broker jaas.conf file:

KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/kafka/kafka_2.11-1.0.0/config/kafka-broker-1-1.server-01.eigenroute.com.keytab"
  storeKey=true
  useTicketCache=false
  serviceName="kafka-broker-1-1"
  principal="kafka-broker-1-1/[email protected]";
};

// This is for the broker acting as a client to ZooKeeper
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/kafka/kafka_2.11-1.0.0/config/kafka-broker-1-1.server-01.eigenroute.com.keytab"
  storeKey=true
  useTicketCache=false
  serviceName="zookeeper"
  principal="kafka-broker-1-1/[email protected]";
};

In my /etc/hosts file, I have the following entry:

127.0.0.1 server-01.eigenroute.com

Any suggestions on why the producer client cannot connect to the Kafka broker? Thanks!

UPDATE: Here is the content of the ZooKeeper znode /apps/kafka-cluster-demo/brokers/ids/11:

[zk: zookeeper-server-02.eigenroute.com:2181(CONNECTED) 27] get /apps/kafka-cluster-demo/brokers/ids/11
{"listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT"},"endpoints":["SASL_PLAINTEXT://server-01.eigenroute.com:9092"],"jmx_port":-1,"host":null,"timestamp":"1515275931134","port":-1,"version":4}
cZxid = 0x2c0000023c
ctime = Sat Jan 06 21:58:51 UTC 2018
mZxid = 0x2c0000023c
mtime = Sat Jan 06 21:58:51 UTC 2018
pZxid = 0x2c0000023c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1001d6237f1001c
dataLength = 209
numChildren = 0

Upvotes: 3

Views: 6265

Answers (1)

Shafique Jamal
Shafique Jamal

Reputation: 1688

There are two problems in my configuration above. The first is that, for the producer properties, in config/sasl-producer.properties, the line

sasl.kerberos.service.name=kafka

should instead be

sasl.kerberos.service.name=kafka-broker-1-1

This is because the service name in the client must match the service name in the broker. After fixing this, a second problem arose:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after...

The following post had the answer for solving this:

ERROR Error when sending message to topic

For the Kafka broker, in config/server-sasl-brokers-zookeeper.properties I had to change

listeners=SASL_PLAINTEXT://server-01.eigenroute.com:9092

to

listeners=SASL_PLAINTEXT://0.0.0.0:9092

(This might have something to do with using AWS). Now all is fine - the producer can write to the topic and the consumer can read from the topic.

Upvotes: 2

Related Questions