Reputation: 121
I am currently using bitnami/kafka image(https://hub.docker.com/r/bitnami/kafka) and deploying it on kubernetes.
Within the cluster the other application are able to find kafka. The problem occurs when trying to access the kafka container from outside the cluster. When reading little bit I read that we need to set property "advertised.listener=PLAINTTEXT://hostname:port_number" for external kafka clients.
I am currently referencing "https://github.com/bitnami/charts/tree/master/bitnami/kafka". Inside my values.yaml file I have added
values.yaml
and statefulset.yaml
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: 'PLAINTEXT://{{ .Values.advertisedListeners }}:9092'
For a single kafka instance it is working fine.
But for 3 node kafka cluster, I changed some configuration like below: values.yaml
and Statefulset.yaml
- name: KAFKA_CFG_ADVERTISED_LISTENERS
{{- if $MY_POD_NAME := "kafka-0" }}
value: 'PLAINTEXT://{{ .Values.advertisedListeners1 }}:9092'
{{- else if $MY_POD_NAME := "kafka-1" }}
value: 'PLAINTEXT://{{ .Values.advertisedListeners2 }}:9092'
{{- else if $MY_POD_NAME := "kafka-2" }}
value: 'PLAINTEXT://{{ .Values.advertisedListeners3 }}:9092'
{{- end }}
Expected result is that all the 3 kafka instances should get advertised.listener property set to worker nodes ip address.
example:
kafka-0 --> "PLAINTEXT://10.21.0.191:9092"
kafka-1 --> "PLAINTEXT://10.21.0.192:9092"
kafka-3 --> "PLAINTEXT://10.21.0.193:9092"
Currently only one kafka pod in up and running and the other two are going to crashloopbackoff state.
and the other two pods are showing error as:
[2019-10-20 13:09:37,753] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler) [2019-10-20 13:09:37,786] ERROR [KafkaServer id=1002] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.IllegalArgumentException: requirement failed: Configured end points 10.21.0.191:9092 in advertised listeners are already registered by broker 1001 at scala.Predef$.require(Predef.scala:224) at kafka.server.KafkaServer$$anonfun$createBrokerInfo$2.apply(KafkaServer.scala:399) at kafka.server.KafkaServer$$anonfun$createBrokerInfo$2.apply(KafkaServer.scala:397) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.KafkaServer.createBrokerInfo(KafkaServer.scala:397) at kafka.server.KafkaServer.startup(KafkaServer.scala:261) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) at kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)
That means the logic applied in statefulset.yaml is not working. Can anyone help me in resolving this..?
Any help would be appreciated..
The output of kubectl get statefulset kafka -o yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
creationTimestamp: "2019-10-29T07:04:12Z"
generation: 1
labels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: kafka
app.kubernetes.io/managed-by: Tiller
app.kubernetes.io/name: kafka
helm.sh/chart: kafka-6.0.1
name: kafka
namespace: default
resourceVersion: "12189730"
selfLink: /apis/apps/v1/namespaces/default/statefulsets/kafka
uid: d40cfd5f-46a6-49d0-a9d3-e3a851356063
spec:
podManagementPolicy: Parallel
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
serviceName: kafka-headless
template:
metadata:
creationTimestamp: null
labels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: kafka
app.kubernetes.io/managed-by: Tiller
app.kubernetes.io/name: kafka
helm.sh/chart: kafka-6.0.1
name: kafka
spec:
containers:
- env:
- name: MY_POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: MY_POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: kafka-zookeeper
- name: KAFKA_PORT_NUMBER
value: "9092"
- name: KAFKA_CFG_LISTENERS
value: PLAINTEXT://:$(KAFKA_PORT_NUMBER)
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://10.21.0.191:9092
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_CFG_BROKER_ID
value: "-1"
- name: KAFKA_CFG_DELETE_TOPIC_ENABLE
value: "false"
- name: KAFKA_HEAP_OPTS
value: -Xmx1024m -Xms1024m
- name: KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES
value: "10000"
- name: KAFKA_CFG_LOG_FLUSH_INTERVAL_MS
value: "1000"
- name: KAFKA_CFG_LOG_RETENTION_BYTES
value: "1073741824"
- name: KAFKA_CFG_LOG_RETENTION_CHECK_INTERVALS_MS
value: "300000"
- name: KAFKA_CFG_LOG_RETENTION_HOURS
value: "168"
- name: KAFKA_CFG_LOG_MESSAGE_FORMAT_VERSION
- name: KAFKA_CFG_MESSAGE_MAX_BYTES
value: "1000012"
- name: KAFKA_CFG_LOG_SEGMENT_BYTES
value: "1073741824"
- name: KAFKA_CFG_LOG_DIRS
value: /bitnami/kafka/data
- name: KAFKA_CFG_DEFAULT_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
value: https
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
- name: KAFKA_CFG_NUM_IO_THREADS
value: "8"
- name: KAFKA_CFG_NUM_NETWORK_THREADS
value: "3"
- name: KAFKA_CFG_NUM_PARTITIONS
value: "1"
- name: KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR
value: "1"
- name: KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES
value: "102400"
- name: KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES
value: "104857600"
- name: KAFKA_CFG_SOCKET_SEND_BUFFER_BYTES
value: "102400"
- name: KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS
value: "6000"
image: docker.io/bitnami/kafka:2.3.0-debian-9-r88
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 2
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
tcpSocket:
port: kafka
timeoutSeconds: 5
name: kafka
ports:
- containerPort: 9092
name: kafka
protocol: TCP
readinessProbe:
failureThreshold: 6
initialDelaySeconds: 5
periodSeconds: 10
successThreshold: 1
tcpSocket:
port: kafka
timeoutSeconds: 5
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /bitnami/kafka
name: data
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext:
fsGroup: 1001
runAsUser: 1001
terminationGracePeriodSeconds: 30
updateStrategy:
type: RollingUpdate
volumeClaimTemplates:
- metadata:
creationTimestamp: null
name: data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 8Gi
volumeMode: Filesystem
status:
phase: Pending
status:
collisionCount: 0
currentReplicas: 3
currentRevision: kafka-56ff499d74
observedGeneration: 1
readyReplicas: 1
replicas: 3
updateRevision: kafka-56ff499d74
updatedReplicas: 3
Upvotes: 1
Views: 8290
Reputation: 3723
I think the The helm chart doesn't whitelist your external (to kubernetes) network for advertised.listeners. I solved a similar issue by reconfiguring the helm values.yaml like this. In my case the 127.0.0.1 network is mac, yours might be different:
externalAccess:
enabled: true
autoDiscovery:
enabled: false
image:
registry: docker.io
repository: bitnami/kubectl
tag: 1.23.4-debian-10-r17
pullPolicy: IfNotPresent
pullSecrets: []
resources:
limits: {}
requests: {}
service:
type: NodePort
port: 9094
loadBalancerIPs: []
loadBalancerSourceRanges: []
nodePorts:
- 30000
- 30001
- 30002
useHostIPs: false
annotations: {}
domain: 127.0.0.1
Upvotes: 0
Reputation: 8152
I see you have some trouble with passing different environment variables for differents pods in a StatefulSet.
You are trying to achieve this using helm templates:
- name: KAFKA_CFG_ADVERTISED_LISTENERS
{{- if $MY_POD_NAME := "kafka-0" }}
value: 'PLAINTEXT://{{ .Values.advertisedListeners1 }}:9092'
{{- else if $MY_POD_NAME := "kafka-1" }}
value: 'PLAINTEXT://{{ .Values.advertisedListeners2 }}:9092'
{{- else if $MY_POD_NAME := "kafka-2" }}
value: 'PLAINTEXT://{{ .Values.advertisedListeners3 }}:9092'
{{- end }}
In helm template guide documentation you can find this explaination:
In Helm templates, a variable is a named reference to another object. It follows the form $name. Variables are assigned with a special assignment operator: :=.
Now let's look at your code:
{{- if $MY_POD_NAME := "kafka-0" }}
This is variable assignment, not comparasion and
after this assignment, if
statement evaluates this expression to true
and that's why in your
staefulset yaml
manifest you see this as an output:
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://10.21.0.191:9092
To make it work as expected, you shouldn't use helm templating. It's not going to work.
One way to do it would be to create separate enviroment variable for every kafka node and pass all of these variables to all pods, like this:
- env:
- name: MY_POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: KAFKA_0
value: 10.21.0.191
- name: KAFKA_1
value: 10.21.0.192
- name: KAFKA_2
value: 10.21.0.193
# - name: KAFKA_CFG_ADVERTISED_LISTENERS
# value: PLAINTEXT://$MY_POD_NAME:9092
and also create your own docker image with modified starting script that will export KAFKA_CFG_ADVERTISED_LISTENERS
variable
with appropriate value depending on MY_POD_NAME
.
If you dont want to create your own image, you can create a ConfigMap
with modified entrypoint.sh
and mount it
in place of old entrypoint.sh
(you can also use any other file, just take a look here
for more information on how kafka image is built).
Mounting ConfigMap
looks like this:
apiVersion: v1
kind: Pod
metadata:
name: test
spec:
containers:
- name: test-container
image: docker.io/bitnami/kafka:2.3.0-debian-9-r88
volumeMounts:
- name: config-volume
mountPath: /entrypoint.sh
subPath: entrypoint.sh
volumes:
- name: config-volume
configMap:
# Provide the name of the ConfigMap containing the files you want
# to add to the container
name: kafka-entrypoint-config
defaultMode: 0744 # remember to add proper (executable) permissions
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-entrypoint-config
namespace: default
data:
entrypoint.sh: |
#!/bin/bash
# Here add modified entrypoint script
Please let me know if it helped.
Upvotes: 1