Reputation: 3651
ULTIMATE GOAL:
I make a research trying to understand how Kafka and OPA Plugin integrated between each other and how easy it will be to use OPA Plugin in production.
STEPS:
I followed OPA Policy Agent tutorial. The part, I am stuck on is how to make Broker work and recognise localhost
in the docker-compose setup.
My docker-compose.yaml
file:
services:
nginx:
image: nginx:1.21.4
volumes:
- "./bundles:/usr/share/nginx/html"
ports:
- "80:80"
opa:
image: openpolicyagent/opa:0.51.0
ports:
- "8181:8181"
command:
- "run"
- "--server"
- "--set=decision_logs.console=true"
- "--set=services.authz.url=http://nginx"
- "--set=bundles.authz.service=authz"
- "--set=bundles.authz.resource=bundle.tar.gz"
depends_on:
- nginx
zookeeper:
image: confluentinc/cp-zookeeper:6.2.1
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOOKEEPER_CLIENT_PORT=2181
broker:
image: confluentinc/cp-kafka:6.2.1
ports:
- "9093:9093"
environment:
# Set cache expiry to low value for development in order to see decisions
KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_SECONDS: 10
KAFKA_OPA_AUTHORIZER_URL: http://opa:8181/v1/data/kafka/authz/allow
KAFKA_AUTHORIZER_CLASS_NAME: org.openpolicyagent.kafka.OpaAuthorizer
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9093
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_SSL_KEYSTORE_FILENAME: server.keystore
KAFKA_SSL_KEYSTORE_CREDENTIALS: credentials.txt
KAFKA_SSL_KEY_CREDENTIALS: credentials.txt
KAFKA_SSL_TRUSTSTORE_FILENAME: server.truststore
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: credentials.txt
KAFKA_SSL_CLIENT_AUTH: required
# KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
# KAFKA_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
# KAFKA_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CLASSPATH: "/plugin/*"
volumes:
- "./plugin:/plugin"
- "./cert/server:/etc/kafka/secrets"
depends_on:
- opa
- zookeeper
The folder structure is the same as in tutorial with generated certificates etc.
The stack trace of the error is below:
broker_1 | [2023-04-27 08:40:46,391] INFO [Controller id=1, targetBrokerId=1] Failed authentication with localhost/127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
broker_1 | [2023-04-27 08:40:46,392] ERROR [Controller id=1, targetBrokerId=1] Connection to node 1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
broker_1 | [2023-04-27 08:40:46,393] WARN [RequestSendThread controllerId=1] Controller 1's connection to broker localhost:9093 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
broker_1 | org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
broker_1 | Caused by: javax.net.ssl.SSLHandshakeException: No name matching localhost found
broker_1 | at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
broker_1 | at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:349)
broker_1 | at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:292)
broker_1 | at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:287)
broker_1 | at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357)
broker_1 | at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232)
broker_1 | at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175)
broker_1 | at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
broker_1 | at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
broker_1 | at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074)
broker_1 | at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
broker_1 | at java.base/java.security.AccessController.doPrivileged(Native Method)
broker_1 | at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008)
broker_1 | at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430)
broker_1 | at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514)
broker_1 | at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368)
broker_1 | at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291)
broker_1 | at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
broker_1 | at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
broker_1 | at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
broker_1 | at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
broker_1 | at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
broker_1 | at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
broker_1 | at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:246)
broker_1 | at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
broker_1 | Caused by: java.security.cert.CertificateException: No name matching localhost found
broker_1 | at java.base/sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:234)
broker_1 | at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:103)
broker_1 | at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
broker_1 | at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:429)
broker_1 | at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283)
broker_1 | at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
broker_1 | at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)
broker_1 | ... 20 more
DEBUGGING STEPS:
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
KAFKA_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
KAFKA_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
It leaves me with the situation when messages aren't acknowledged and broker isn't reachable.
localhost
to internal.docker.host
and to 0.0.0.0
internal.docker.host gives the same error. 0.0.0.0 simply non-routable.
I also looked for other answers but not much information, I could operate with. Most of them nailed down to those 2 above.
Version changing didn't help much as Kafka is evolving itself and in new versions there are other challenges and required changes to the docker-compose (for example deprecated Zookeeper).
MACHINE INFO:
For running docker, I use colima
engine which is a decent alternative to docker
engine from Docker Inc.
Machine is MacOS.
QUESTION:
What can I do to fix this current issue? Or maybe land to another more common, easier to fix issue?
Upvotes: 1
Views: 538