Reputation: 7633
I am trying to deploy Zookeeper and Kafka on Kubernetes using the confluentinc docker images. I based my solution on this question and this post. The Zookeeper is running without errors on the log. I want to deploy 3 Kafka brokers using StatefulSet
. The problem with my yaml
files is that I don't know how to configure the KAFKA_ADVERTISED_LISTENERS
property for Kafka when using 3 brokers.
Here is the yaml
files for zookeeper:
apiVersion: v1
kind: Service
metadata:
name: zookeeper
labels:
app: zookeeper
spec:
clusterIP: None
ports:
- name: client
port: 2181
protocol: TCP
- name: follower
port: 2888
protocol: TCP
- name: leader
port: 3888
protocol: TCP
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper
spec:
replicas: 1
serviceName: zookeeper
selector:
matchLabels:
app: zookeeper # has to match .spec.template.metadata.labels
template:
metadata:
labels:
app: zookeeper # has to match .spec.selector.matchLabels
spec:
hostname: zookeeper
containers:
- name: zookeeper
image: confluentinc/cp-zookeeper:5.5.0
ports:
- containerPort: 2181
env:
- name: ZOOKEEPER_CLIENT_PORT
value: "2181"
- name: ZOOKEEPER_ID
value: "1"
- name: ZOOKEEPER_SERVER_1
value: zookeeper
and for the kafka broker:
apiVersion: v1
kind: Service
metadata:
name: kafka-service
labels:
app: kafka
spec:
type: LoadBalancer
ports:
- port: 9092
name: kafka-port
protocol: TCP
selector:
app: kafka
id: "0"
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
replicas: 3
serviceName: kafka
podManagementPolicy: OrderedReady
selector:
matchLabels:
app: kafka # has to match .spec.template.metadata.labels
template:
metadata:
labels:
app: kafka # has to match .spec.selector.matchLabels
spec:
containers:
- name: kafka
image: confluentinc/cp-enterprise-kafka:5.5.0
ports:
- containerPort: 9092
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181 # zookeeper-2.zookeeper.default.svc.cluster.local
- name: KAFKA_ADVERTISED_LISTENERS
value: "LISTENER_0://kafka-0:9092,LISTENER_1://kafka-1:9093,LISTENER_2://kafka-2:9094"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "LISTENER_0:PLAINTEXT,LISTENER_1:PLAINTEXT,LISTENER_2:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: LISTENER_0
I get the 3 kafka pods running, the kafka-0
is connecting but the kafka-1
and kafka-2
are not connecting.
$ kubectl get pods -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
kafka-0 1/1 Running 0 4m12s 172.17.0.4 minikube <none> <none>
kafka-1 1/1 Running 5 4m9s 172.17.0.5 minikube <none> <none>
kafka-2 0/1 CrashLoopBackOff 4 4m7s 172.17.0.6 minikube <none> <none>
zookeeper-0 1/1 Running 0 21m 172.17.0.3 minikube <none> <none>
The error is saying that I already advertised kafka-0:9092,kafka-1:9093,kafka-2:9094
in the first pod kafka-0
. So, I suppose it has to be dynamic. How do I configure it?
[2020-09-30 14:56:40,519] ERROR [KafkaServer id=1017] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: requirement failed: Configured end points kafka-0:9092,kafka-1:9093,kafka-2:9094 in advertised listeners are already registered by broker 1012
at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3(KafkaServer.scala:436)
at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3$adapted(KafkaServer.scala:434)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at kafka.server.KafkaServer.createBrokerInfo(KafkaServer.scala:434)
at kafka.server.KafkaServer.startup(KafkaServer.scala:293)
at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:114)
at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:66)
Upvotes: 1
Views: 1199
Reputation: 7633
I have been reading this blog post "Kafka Listeners - Explained" and I was able to configure 3 Kafka brokers with the following configuration.
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
replicas: 3
serviceName: kafka
podManagementPolicy: OrderedReady
selector:
matchLabels:
app: kafka # has to match .spec.template.metadata.labels
template:
metadata:
labels:
app: kafka # has to match .spec.selector.matchLabels
spec:
restartPolicy: Always
containers:
- name: kafka
image: confluentinc/cp-enterprise-kafka:5.5.0
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits: # limit of 0.5 cpu and 512MiB of memory
memory: "512Mi"
cpu: "500m"
# imagePullPolicy: Always
ports:
- containerPort: 9092
name: kafka-0
- containerPort: 9093
name: kafka-1
- containerPort: 9094
name: kafka-2
env:
- name: MY_METADATA_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: STAS_DELAY
value: "120"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181 # zookeeper-2.zookeeper.default.svc.cluster.local
- name: KAFKA_ADVERTISED_LISTENERS
value: "INSIDE://$(MY_POD_IP):9092"
- name: KAFKA_LISTENERS
value: "INSIDE://$(MY_POD_IP):9092"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INSIDE:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INSIDE"
Upvotes: 3