Felipe
Felipe

Reputation: 7633

Error when using StatefulSet object from Kubernetes to find the kafka broker without a static IP

I am trying to deploy Zookeeper and Kafka on Kubernetes using the confluentinc docker images. I based my solution on this question and this post. The Zookeeper is running without errors on the log. I want to deploy 3 Kafka brokers using StatefulSet. The problem with my yaml files is that I don't know how to configure the KAFKA_ADVERTISED_LISTENERS property for Kafka when using 3 brokers.

Here is the yaml files for zookeeper:

apiVersion: v1
kind: Service
metadata:
  name: zookeeper
  labels:
    app: zookeeper
spec:
  clusterIP: None
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
spec:
  replicas: 1
  serviceName: zookeeper
  selector:
    matchLabels:
      app: zookeeper # has to match .spec.template.metadata.labels
  template:
    metadata:
      labels:
        app: zookeeper # has to match .spec.selector.matchLabels
    spec:
      hostname: zookeeper
      containers:
      - name: zookeeper
        image: confluentinc/cp-zookeeper:5.5.0
        ports:
        - containerPort: 2181
        env:
        - name: ZOOKEEPER_CLIENT_PORT
          value: "2181"
        - name: ZOOKEEPER_ID
          value: "1"
        - name: ZOOKEEPER_SERVER_1
          value: zookeeper

and for the kafka broker:

apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  labels:
    app: kafka
spec:
  type: LoadBalancer
  ports:
  - port: 9092
    name: kafka-port
    protocol: TCP
  selector:
    app: kafka
    id: "0"
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  replicas: 3
  serviceName: kafka
  podManagementPolicy: OrderedReady
  selector:
    matchLabels:
      app: kafka # has to match .spec.template.metadata.labels
  template:
    metadata:
      labels:
        app: kafka # has to match .spec.selector.matchLabels
    spec:
      containers:
      - name: kafka
        image: confluentinc/cp-enterprise-kafka:5.5.0
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper:2181 # zookeeper-2.zookeeper.default.svc.cluster.local
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "LISTENER_0://kafka-0:9092,LISTENER_1://kafka-1:9093,LISTENER_2://kafka-2:9094"
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: "LISTENER_0:PLAINTEXT,LISTENER_1:PLAINTEXT,LISTENER_2:PLAINTEXT"
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: LISTENER_0

I get the 3 kafka pods running, the kafka-0 is connecting but the kafka-1 and kafka-2 are not connecting.

$ kubectl get pods -o wide
NAME          READY   STATUS             RESTARTS   AGE     IP           NODE       NOMINATED NODE   READINESS GATES
kafka-0       1/1     Running            0          4m12s   172.17.0.4   minikube   <none>           <none>
kafka-1       1/1     Running            5          4m9s    172.17.0.5   minikube   <none>           <none>
kafka-2       0/1     CrashLoopBackOff   4          4m7s    172.17.0.6   minikube   <none>           <none>
zookeeper-0   1/1     Running            0          21m     172.17.0.3   minikube   <none>           <none>

The error is saying that I already advertised kafka-0:9092,kafka-1:9093,kafka-2:9094 in the first pod kafka-0. So, I suppose it has to be dynamic. How do I configure it?

[2020-09-30 14:56:40,519] ERROR [KafkaServer id=1017] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: requirement failed: Configured end points kafka-0:9092,kafka-1:9093,kafka-2:9094 in advertised listeners are already registered by broker 1012
    at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3(KafkaServer.scala:436)
    at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3$adapted(KafkaServer.scala:434)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at kafka.server.KafkaServer.createBrokerInfo(KafkaServer.scala:434)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:293)
    at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:114)
    at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:66)

Upvotes: 1

Views: 1199

Answers (1)

Felipe
Felipe

Reputation: 7633

I have been reading this blog post "Kafka Listeners - Explained" and I was able to configure 3 Kafka brokers with the following configuration.

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  replicas: 3
  serviceName: kafka
  podManagementPolicy: OrderedReady
  selector:
    matchLabels:
      app: kafka # has to match .spec.template.metadata.labels
  template:
    metadata:
      labels:
        app: kafka # has to match .spec.selector.matchLabels
    spec:
      restartPolicy: Always
      containers:
      - name: kafka
        image: confluentinc/cp-enterprise-kafka:5.5.0
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits: # limit of 0.5 cpu and 512MiB of memory
            memory: "512Mi"
            cpu: "500m"
        # imagePullPolicy: Always
        ports:
        - containerPort: 9092
          name: kafka-0
        - containerPort: 9093
          name: kafka-1
        - containerPort: 9094
          name: kafka-2
        env:
        - name: MY_METADATA_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: MY_POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: STAS_DELAY
          value: "120"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper:2181 # zookeeper-2.zookeeper.default.svc.cluster.local
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "INSIDE://$(MY_POD_IP):9092"
        - name: KAFKA_LISTENERS
          value: "INSIDE://$(MY_POD_IP):9092"
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: "INSIDE:PLAINTEXT"
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: "INSIDE"

Upvotes: 3

Related Questions