Reputation: 1920
I am trying to expose KAFKA in my Kubernetes setup for external usage using node port.
My Helmcharts kafka-service.yaml is as follows:
apiVersion: v1
kind: Service
metadata:
name: kafka
namespace: test
labels:
app: kafka-test
unit: kafka
spec:
type: NodePort
selector:
app: test-app
unit: kafka
parentdeployment: test-kafka
ports:
- name: kafka
port: 9092
targetPort: 9092
nodePort: 30092
protocol: TCP
kafka-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka
namespace: {{ .Values.test.namespace }}
labels:
app: test-app
unit: kafka
spec:
replicas: 1
template:
metadata:
labels:
app: test-app
unit: kafka
parentdeployment: test-kafka
spec:
hostname: kafka
subdomain: kafka
securityContext:
fsGroup: {{ .Values.test.groupID }}
containers:
- name: kafka
image: test_kafka:{{ .Values.test.kafkaImageTag }}
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
env:
- name: IS_KAFKA_CLUSTER
value: 'false'
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2281
- name: KAFKA_LISTENERS
value: SSL://:9092
- name: KAFKA_KEYSTORE_PATH
value: /opt/kafka/conf/kafka.keystore.jks
- name: KAFKA_TRUSTSTORE_PATH
value: /opt/kafka/conf/kafka.truststore.jks
- name: KAFKA_KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-secret
key: jkskey
- name: KAFKA_TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-secret
key: jkskey
- name: KAFKA_LOG_DIRS
value: /opt/kafka/data
- name: KAFKA_ADV_LISTENERS
value: SSL://kafka:9092
- name: KAFKA_CLIENT_AUTH
value: none
volumeMounts:
- mountPath: "/opt/kafka/conf"
name: kafka-conf-pv
- mountPath: "/opt/kafka/data"
name: kafka-data-pv
volumes:
- name: kafka-conf-pv
persistentVolumeClaim:
claimName: kafka-conf-pvc
- name: kafka-data-pv
persistentVolumeClaim:
claimName: kafka-data-pvc
selector:
matchLabels:
app: test-app
unit: kafka
parentdeployment: test-kafka
zookeeper service yaml
apiVersion: v1
kind: Service
metadata:
name: zookeeper
namespace: {{ .Values.test.namespace }}
labels:
app: test-ra
unit: zookeeper
spec:
type: ClusterIP
selector:
app: test-ra
unit: zookeeper
parentdeployment: test-zookeeper
ports:
- name: zookeeper
port: 2281
zookeeper deployment yaml file
apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper
namespace: {{ .Values.test.namespace }}
labels:
app: test-app
unit: zookeeper
spec:
replicas: 1
template:
metadata:
labels:
app: test-app
unit: zookeeper
parentdeployment: test-zookeeper
spec:
hostname: zookeeper
subdomain: zookeeper
securityContext:
fsGroup: {{ .Values.test.groupID }}
containers:
- name: zookeeper
image: test_zookeeper:{{ .Values.test.zookeeperImageTag }}
imagePullPolicy: IfNotPresent
ports:
- containerPort: 2281
env:
- name: IS_ZOOKEEPER_CLUSTER
value: 'false'
- name: ZOOKEEPER_SSL_CLIENT_PORT
value: '2281'
- name: ZOOKEEPER_DATA_DIR
value: /opt/zookeeper/data
- name: ZOOKEEPER_DATA_LOG_DIR
value: /opt/zookeeper/data/log
- name: ZOOKEEPER_KEYSTORE_PATH
value: /opt/zookeeper/conf/zookeeper.keystore.jks
- name: ZOOKEEPER_KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: zookeeper-secret
key: jkskey
- name: ZOOKEEPER_TRUSTSTORE_PATH
value: /opt/zookeeper/conf/zookeeper.truststore.jks
- name: ZOOKEEPER_TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: zookeeper-secret
key: jkskey
volumeMounts:
- mountPath: "/opt/zookeeper/data"
name: zookeeper-data-pv
- mountPath: "/opt/zookeeper/conf"
name: zookeeper-conf-pv
volumes:
- name: zookeeper-data-pv
persistentVolumeClaim:
claimName: zookeeper-data-pvc
- name: zookeeper-conf-pv
persistentVolumeClaim:
claimName: zookeeper-conf-pvc
selector:
matchLabels:
app: test-ra
unit: zookeeper
parentdeployment: test-zookeeper
kubectl describe for kafka also shows exposed nodeport
Type: NodePort
IP: 10.233.1.106
Port: kafka 9092/TCP
TargetPort: 9092/TCP
NodePort: kafka 30092/TCP
Endpoints: 10.233.66.15:9092
Session Affinity: None
External Traffic Policy: Cluster
I have a publisher binary that will send some messages into Kafka. As I am having a 3 node cluster deployment, I am using my primary node IP and Kafka node port (30092) to connect with the Kafka.
But my binary is getting dial tcp <primary_node_ip>:9092: connect: connection refused
error. I am unable to understand why is it getting rejected even after nodePort to targetPort conversion is successful. With the further debugging I am seeing the following debug logs in the kafka logs:
[2021-01-13 08:17:51,692] DEBUG Accepted connection from /10.233.125.0:1564 on /10.233.66.15:9092 and assigned it to processor 0, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (kafka.network.Acceptor)
[2021-01-13 08:17:51,692] DEBUG Processor 0 listening to new connection from /10.233.125.0:1564 (kafka.network.Processor)
[2021-01-13 08:17:51,702] DEBUG [SslTransportLayer channelId=10.233.66.15:9092-10.233.125.0:1564-245 key=sun.nio.ch.SelectionKeyImpl@43dc2246] SSL peer is not authenticated, returning ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2021-01-13 08:17:51,702] DEBUG [SslTransportLayer channelId=10.233.66.15:9092-10.233.125.0:1564-245 key=sun.nio.ch.SelectionKeyImpl@43dc2246] SSL handshake completed successfully with peerHost '10.233.125.0' peerPort 1564 peerPrincipal 'User:ANONYMOUS' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256' (org.apache.kafka.common.network.SslTransportLayer)
[2021-01-13 08:17:51,702] DEBUG [SocketServer brokerId=1001] Successfully authenticated with /10.233.125.0 (org.apache.kafka.common.network.Selector)
[2021-01-13 08:17:51,707] DEBUG [SocketServer brokerId=1001] Connection with /10.233.125.0 disconnected (org.apache.kafka.common.network.Selector)
java.io.EOFException
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:614)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:95)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at kafka.network.Processor.poll(SocketServer.scala:861)
at kafka.network.Processor.run(SocketServer.scala:760)
at java.lang.Thread.run(Thread.java:748)
With the same configuration, I was able to expose other services. What am I missing here?
Update: When I added KAFKA_LISTENERS and KAFKA_ADV_LISTENERS for EXTERNAL and changed the targetPort to 30092, the error message during external connections disappeared, but started getting connection errors for internal connections.
Solution: I exposed another service for external communication like mentioned in the answer and exposed 30092 as the port and the node port for it. So there was no requirement of targetPort. I also had to add additional KAFKA_LISTENERS and KAFKA_ADV_LISTENERS in the deployment file for external communication
Upvotes: 2
Views: 1840
Reputation: 866
We faced a similar issue in one of our Kafka setups; we ended up creating two k8s services, one using ClusterIP for internal communication and second service with same labels using NodePort for external communication.
internal access
apiVersion: v1
kind: Service
metadata:
name: kafka-internal
namespace: test
labels:
app: kafka-test
unit: kafka
spec:
type: NodePort
selector:
app: test-app
unit: kafka
parentdeployment: test-kafka
ports:
- name: kafka
port: 9092
protocol: TCP
type: ClusterIP
external access
apiVersion: v1
kind: Service
metadata:
name: kafka-external
namespace: test
labels:
app: kafka-test
unit: kafka
spec:
type: NodePort
selector:
app: test-app
unit: kafka
parentdeployment: test-kafka
ports:
- name: kafka
port: 9092
targetPort: 9092
protocol: TCP
type: NodePort
Upvotes: 1