Reputation: 63
I have testing cluster with 3 Kafka instances in KRaft mode. Each instance is Broker and Controller. I have inter communication secured with SSL certificate.
After cluster start, each instances know about each. When I create new topic with replicas, problems started.
I create it by command:
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=ofd-kafka-0:9092 --create --topic containers --partitions 3 --replication-factor 3
Topic exists, but it seems, that Kafka get stuck in adding replicas, looks like:
Topic: containers TopicId: ZrOENsloQXuqTn2jt4NbNA PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=2160000,max.message.bytes=15728640,retention.bytes=-1
Topic: containers Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1 Adding Replicas: 0,2 Removing Replicas:
Topic: containers Partition: 1 Leader: 2 Replicas: 0,1,2 Isr: 2 Adding Replicas: 0,1 Removing Replicas:
Topic: containers Partition: 2 Leader: 0 Replicas: 2,1,0 Isr: 0 Adding Replicas: 1,2 Removing Replicas:
In each Kafka instance log I can see warnings:
[2023-06-21 14:02:31,827] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 8458 ms. (org.apache.kafka.clients.NetworkClient)
[2023-06-21 14:02:31,827] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Client requested connection close from node 1 (org.apache.kafka.clients.NetworkClient)
[2023-06-21 14:02:31,827] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: (org.apache.kafka.clients.FetchSessionHandler)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:109)
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:78)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:309)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123)
at scala.Option.foreach(Option.scala:407)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106)
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:97)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2023-06-21 14:02:31,827] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={containers-1=PartitionData(topicId=wA7p8AUSRCmvg-Nc4qL9hA, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:109)
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:78)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:309)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123)
at scala.Option.foreach(Option.scala:407)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106)
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:97)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
It seems, like instances try to sync each other, but get disconnected. This is final state.
My current setup looks like:
apiVersion: apps/v1
kind: StatefulSet
...
spec:
template:
spec:
containers:
- args:
- -ec
- |
export KAFKA_CFG_NODE_ID="$(echo "$MY_POD_NAME" | grep -o -E '[0-9[[]*$')"
/opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
command:
- /bin/bash
env:
- name: BITNAMI_DEBUG
value: "true"
- name: KAFKA_KRAFT_CLUSTER_ID
value: "Y2MyZmRlNDY3MjM0NGU4Yj"
- name: KAFKA_CFG_PROCESS_ROLES
value: controller,broker
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
value: 0@kafka-inst-0:9093,1@kafka-inst-1:9093,2@kafka-inst-2:9093
- name: KAFKA_CFG_LISTENERS
value: PLAINTEXT://:9092,CONTROLLER://:9093,INTERNAL://:9094
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
- name: KAFKA_HEAP_OPTS
value: -Xmx1G -Xms1G
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_CFG_MIN_INSYNC_REPLICAS
value: "2"
- name: KAFKA_CFG_DEFAULT_REPLICATION_FACTOR
value: "3"
- name: MY_POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: INTERNAL://$(MY_POD_NAME).test.svc.cluster.local:9094,PLAINTEXT://$(MY_POD_NAME).test.svc.cluster.local:9092
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INTERNAL
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: INTERNAL:SSL,CONTROLLER:SSL,PLAINTEXT:PLAINTEXT
- name: KAFKA_TLS_TYPE
value: PEM
- name: KAFKA_CFG_INITIAL_BROKER_REGISTRATION_TIMEOUT_MS
value: "240000"
image: bitnami/kafka:3.3.2-debian-11-r49
imagePullPolicy: IfNotPresent
...
Same problem I have with different Kafka version (3.3.2, 3.4.1) or image release. Cluster works fine, until I add replicas.
Have please somebody any clue, what is wrong? Or some hints will be valuable as well.
Many thanks!
I taked inspiration from the blog and adapted scripts for me. Self signed CA cert I assemble in the helm chart via function genCA, if secret with CAcert is missing. If exist, let it as is. This is enough for development purpose. Later, especial in the production, I add handling liveness of the CAcert via CertManager.
CA cert and key are available only for init script. Init make preparation (inspired from the scripts above) for the broker cert. For pod with broker are available only CA cert, CA key is ignored, and of course broker cert and broker key.
As a broker cert store is used PEM file and corresponding KEY file. As a broker trust store is used CAcert PEM file. In statefullset it looks like (based on the Bitnami documentation):
volumeMounts:
- name: cacert
mountPath: /opt/bitnami/kafka/config/certs/kafka.truststore.pem
subPath: tls.crt
- name: storage
mountPath: /opt/bitnami/kafka/config/certs/kafka.keystore.pem
subPath: certs/kafka.keystore.pem
- name: storage
mountPath: /opt/bitnami/kafka/config/certs/kafka.keystore.key
subPath: certs/kafka.keystore.key
I think, that this approach working fine, because when I add -Djavax.net.debug=all
parameter, in the logs from brokers I can see, that SSL communication is used without problems.
Same situation, when I made just empty topic with 3 partitions. All brokers made it on their own storage.
If the certs or approach should be bad, this isnt working, I think.
Kafka cluster is used in our helm chart as a loadbalancer for messages. Many times we get huge amount of messages in short time period, but by default the flow is stable. Sometimes we need to restart consumer application, so, we using Kafka as a cache in the case. Deploying operator to manage Kafka making new dependency on the helm chart and in the cluster by default exist only one Kafka cluster. Using operator have sense, if we need to have many Kafka cluster installations in the Kubernetes cluster.
This is just first step. In the step I attempt to secure communication only between brokers. So, producers and consumers stay as is.
When I finished this step, of course, next step will be secure communication between Kafka cluster and the applications one by one.
When no application without security not exist, parameter ALLOW_PLAINTEXT_LISTENER=yes
will be removed.
Upvotes: 0
Views: 731
Reputation: 63
Thank you for your effort, I solved the problem. Was on typically place - between chair and keyboard :). I focused to Kafka too much :(.
In the step I introduce new listener (INTERNAL://:9094
), but forgot add it to the Kubernetes service object. Looks like:
spec:
ports:
- port: 9092
name: listener
- port: 9093
name: controller
- port: 9094
name: internal
Last two lines solved the problem :).
Upvotes: 0