Arjun Ajith
Arjun Ajith

Reputation: 1920

Kafka connection refused with Kubernetes nodeport

I am trying to expose KAFKA in my Kubernetes setup for external usage using node port.

My Helmcharts kafka-service.yaml is as follows:

apiVersion: v1
kind: Service
metadata:
  name: kafka
  namespace: test
  labels:
    app: kafka-test
    unit: kafka
spec:
  type: NodePort
  selector:
    app: test-app
    unit: kafka
    parentdeployment: test-kafka
  ports:
    - name: kafka
      port: 9092
      targetPort: 9092
      nodePort: 30092
      protocol: TCP

kafka-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka
  namespace: {{ .Values.test.namespace }}
  labels:
    app: test-app
    unit: kafka
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: test-app
        unit: kafka
        parentdeployment: test-kafka
    spec:
      hostname: kafka
      subdomain: kafka
      securityContext:
        fsGroup: {{ .Values.test.groupID }}
      containers:
        - name: kafka
          image: test_kafka:{{ .Values.test.kafkaImageTag }}
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
          env:
            - name: IS_KAFKA_CLUSTER
              value: 'false'
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zookeeper:2281
            - name: KAFKA_LISTENERS
              value: SSL://:9092
            - name: KAFKA_KEYSTORE_PATH
              value: /opt/kafka/conf/kafka.keystore.jks
            - name: KAFKA_TRUSTSTORE_PATH
              value: /opt/kafka/conf/kafka.truststore.jks
            - name: KAFKA_KEYSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: kafka-secret
                  key: jkskey
            - name: KAFKA_TRUSTSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: kafka-secret
                  key: jkskey
            - name: KAFKA_LOG_DIRS
              value: /opt/kafka/data
            - name: KAFKA_ADV_LISTENERS
              value: SSL://kafka:9092
            - name: KAFKA_CLIENT_AUTH
              value: none
          volumeMounts:
            - mountPath: "/opt/kafka/conf"
              name: kafka-conf-pv
            - mountPath: "/opt/kafka/data"
              name: kafka-data-pv
      volumes:
        - name: kafka-conf-pv
          persistentVolumeClaim:
            claimName: kafka-conf-pvc
        - name: kafka-data-pv
          persistentVolumeClaim:
            claimName: kafka-data-pvc
  selector:
    matchLabels:
      app: test-app
      unit: kafka
      parentdeployment: test-kafka

zookeeper service yaml

apiVersion: v1
kind: Service
metadata:
  name: zookeeper
  namespace: {{ .Values.test.namespace }}
  labels:
    app: test-ra
    unit: zookeeper
spec:
  type: ClusterIP
  selector:
    app: test-ra
    unit: zookeeper
    parentdeployment: test-zookeeper
  ports:
    - name: zookeeper
      port: 2281

zookeeper deployment yaml file

apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper
  namespace: {{ .Values.test.namespace }}
  labels:
    app: test-app
    unit: zookeeper
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: test-app
        unit: zookeeper
        parentdeployment: test-zookeeper
    spec:
      hostname: zookeeper
      subdomain: zookeeper
      securityContext:
        fsGroup: {{ .Values.test.groupID }}
      containers:
        - name: zookeeper
          image: test_zookeeper:{{ .Values.test.zookeeperImageTag }}
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 2281
          env:
            - name: IS_ZOOKEEPER_CLUSTER
              value: 'false'
            - name: ZOOKEEPER_SSL_CLIENT_PORT
              value: '2281'
            - name: ZOOKEEPER_DATA_DIR
              value: /opt/zookeeper/data
            - name: ZOOKEEPER_DATA_LOG_DIR
              value: /opt/zookeeper/data/log
            - name: ZOOKEEPER_KEYSTORE_PATH
              value: /opt/zookeeper/conf/zookeeper.keystore.jks
            - name: ZOOKEEPER_KEYSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: zookeeper-secret
                  key: jkskey
            - name: ZOOKEEPER_TRUSTSTORE_PATH
              value: /opt/zookeeper/conf/zookeeper.truststore.jks
            - name: ZOOKEEPER_TRUSTSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: zookeeper-secret
                  key: jkskey
          volumeMounts:
            - mountPath: "/opt/zookeeper/data"
              name: zookeeper-data-pv
            - mountPath: "/opt/zookeeper/conf"
              name: zookeeper-conf-pv
      volumes:
        - name: zookeeper-data-pv
          persistentVolumeClaim:
            claimName: zookeeper-data-pvc
        - name: zookeeper-conf-pv
          persistentVolumeClaim:
            claimName: zookeeper-conf-pvc
  selector:
    matchLabels:
      app: test-ra
      unit: zookeeper
      parentdeployment: test-zookeeper

kubectl describe for kafka also shows exposed nodeport

Type:                     NodePort
IP:                       10.233.1.106
Port:                     kafka  9092/TCP
TargetPort:               9092/TCP
NodePort:                 kafka  30092/TCP
Endpoints:                10.233.66.15:9092
Session Affinity:         None
External Traffic Policy:  Cluster

I have a publisher binary that will send some messages into Kafka. As I am having a 3 node cluster deployment, I am using my primary node IP and Kafka node port (30092) to connect with the Kafka.

But my binary is getting dial tcp <primary_node_ip>:9092: connect: connection refused error. I am unable to understand why is it getting rejected even after nodePort to targetPort conversion is successful. With the further debugging I am seeing the following debug logs in the kafka logs:

[2021-01-13 08:17:51,692] DEBUG Accepted connection from /10.233.125.0:1564 on /10.233.66.15:9092 and assigned it to processor 0, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (kafka.network.Acceptor)
[2021-01-13 08:17:51,692] DEBUG Processor 0 listening to new connection from /10.233.125.0:1564 (kafka.network.Processor)
[2021-01-13 08:17:51,702] DEBUG [SslTransportLayer channelId=10.233.66.15:9092-10.233.125.0:1564-245 key=sun.nio.ch.SelectionKeyImpl@43dc2246] SSL peer is not authenticated, returning ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2021-01-13 08:17:51,702] DEBUG [SslTransportLayer channelId=10.233.66.15:9092-10.233.125.0:1564-245 key=sun.nio.ch.SelectionKeyImpl@43dc2246] SSL handshake completed successfully with peerHost '10.233.125.0' peerPort 1564 peerPrincipal 'User:ANONYMOUS' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256' (org.apache.kafka.common.network.SslTransportLayer)
[2021-01-13 08:17:51,702] DEBUG [SocketServer brokerId=1001] Successfully authenticated with /10.233.125.0 (org.apache.kafka.common.network.Selector)
[2021-01-13 08:17:51,707] DEBUG [SocketServer brokerId=1001] Connection with /10.233.125.0 disconnected (org.apache.kafka.common.network.Selector)
java.io.EOFException
        at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:614)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:95)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
        at kafka.network.Processor.poll(SocketServer.scala:861)
        at kafka.network.Processor.run(SocketServer.scala:760)
        at java.lang.Thread.run(Thread.java:748)

With the same configuration, I was able to expose other services. What am I missing here?

Update: When I added KAFKA_LISTENERS and KAFKA_ADV_LISTENERS for EXTERNAL and changed the targetPort to 30092, the error message during external connections disappeared, but started getting connection errors for internal connections.

Solution: I exposed another service for external communication like mentioned in the answer and exposed 30092 as the port and the node port for it. So there was no requirement of targetPort. I also had to add additional KAFKA_LISTENERS and KAFKA_ADV_LISTENERS in the deployment file for external communication

Upvotes: 2

Views: 1840

Answers (1)

hdhruna
hdhruna

Reputation: 866

We faced a similar issue in one of our Kafka setups; we ended up creating two k8s services, one using ClusterIP for internal communication and second service with same labels using NodePort for external communication.

internal access

apiVersion: v1
kind: Service
metadata:
  name: kafka-internal
  namespace: test
  labels:
    app: kafka-test
    unit: kafka
spec:
  type: NodePort
  selector:
    app: test-app
    unit: kafka
    parentdeployment: test-kafka
  ports:
    - name: kafka
      port: 9092
      protocol: TCP
  type: ClusterIP

external access

apiVersion: v1
kind: Service
metadata:
  name: kafka-external
  namespace: test
  labels:
    app: kafka-test
    unit: kafka
spec:
  type: NodePort
  selector:
    app: test-app
    unit: kafka
    parentdeployment: test-kafka
  ports:
    - name: kafka
      port: 9092
      targetPort: 9092
      protocol: TCP
  type: NodePort

Upvotes: 1

Related Questions