Reputation: 21
I am trying to implement encryption and authentication using SSL/TLS in Kafka on Windows. I created the TLS keys and Certificates according to this link and set the following properties in server.properties:
listeners=SSL://0.0.0.0:9092
advertised.listeners=SSL://<host-name>:9092 *
ssl.keystore.location=C:/DataGrid/Kafka/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=C:/DataGrid/Kafka/kafka.server.truststore.jks
ssl.truststore.password=test1234
security.inter.broker.protocol=SSL
ssl.client.auth=required
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:<host-name>
security.protocol=SSL
But when trying to create a topic using the following command in cmd:
kafka-topics.bat --create --bootstrap-server <host-name>:9092 --replication-factor 1 --partitions 1 --topic test
I get this error:
C:\DataGrid\Kafka\bin\windows>kafka-topics.bat --create --bootstrap-server vmnala:9092 --replication-factor 1 --partitions 1 --topic test
[2020-06-17 11:52:49,629] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)
at java.lang.Thread.run(Thread.java:748)
Looking at server.log I see the following error:
[2020-06-17 11:52:49,567] INFO [SocketServer brokerId=1] Failed authentication with /10.2.9.200 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
Besides, when trying to create a consumer in python I get:
>>> from kafka import KafkaConsumer
>>> c = KafkaConsumer(bootstrap_servers='vmnala:9092', security_protocol = "SSL")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Python\lib\site-packages\kafka\consumer\group.py", line 324, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "C:\Python\lib\site-packages\kafka\client_async.py", line 221, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "C:\Python\lib\site-packages\kafka\client_async.py", line 826, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
>>>
Any ideas what can be the cause of it?
Many thanks!
Upvotes: 1
Views: 969
Reputation: 26885
This happens because you configured your brokers to use SSL
but you are not configuring your clients to do so. By default, clients use PLAINTEXT
.
For kafka-topics
, you need to give it a file via --command-config
that contains the required properties. Something like:
ssl.truststore.location=<PATH>
ssl.truststore.password=<PASSWORD>
security.protocol=SSL
For the Python client, you need to specify the corresponding configs in the constructor. See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer for the list of configurations
Upvotes: 1