Adavan
Adavan

Reputation: 63

Kafka get stuck in adding replicas

I have testing cluster with 3 Kafka instances in KRaft mode. Each instance is Broker and Controller. I have inter communication secured with SSL certificate.

After cluster start, each instances know about each. When I create new topic with replicas, problems started.

I create it by command:

/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=ofd-kafka-0:9092 --create --topic containers --partitions 3 --replication-factor 3

Topic exists, but it seems, that Kafka get stuck in adding replicas, looks like:

Topic: containers    TopicId: ZrOENsloQXuqTn2jt4NbNA    PartitionCount: 3    ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=2160000,max.message.bytes=15728640,retention.bytes=-1
    Topic: containers    Partition: 0    Leader: 1    Replicas: 0,1,2    Isr: 1    Adding Replicas: 0,2    Removing Replicas: 
    Topic: containers    Partition: 1    Leader: 2    Replicas: 0,1,2    Isr: 2    Adding Replicas: 0,1    Removing Replicas: 
    Topic: containers    Partition: 2    Leader: 0    Replicas: 2,1,0    Isr: 0    Adding Replicas: 1,2    Removing Replicas: 

In each Kafka instance log I can see warnings:

[2023-06-21 14:02:31,827] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 8458 ms. (org.apache.kafka.clients.NetworkClient)
[2023-06-21 14:02:31,827] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Client requested connection close from node 1 (org.apache.kafka.clients.NetworkClient)
[2023-06-21 14:02:31,827] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: (org.apache.kafka.clients.FetchSessionHandler)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
    at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:109)
    at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:78)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:309)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123)
    at scala.Option.foreach(Option.scala:407)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106)
    at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:97)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2023-06-21 14:02:31,827] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={containers-1=PartitionData(topicId=wA7p8AUSRCmvg-Nc4qL9hA, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
    at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:109)
    at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:78)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:309)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123)
    at scala.Option.foreach(Option.scala:407)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106)
    at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:97)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

It seems, like instances try to sync each other, but get disconnected. This is final state.

My current setup looks like:

apiVersion: apps/v1
kind: StatefulSet
...
spec:
  template:
    spec:
      containers:
      - args:
        - -ec
        - |
          export KAFKA_CFG_NODE_ID="$(echo "$MY_POD_NAME" | grep -o -E '[0-9[[]*$')"
          /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
        command:
        - /bin/bash
        env:
        - name: BITNAMI_DEBUG
          value: "true"
        - name: KAFKA_KRAFT_CLUSTER_ID
          value: "Y2MyZmRlNDY3MjM0NGU4Yj"
        - name: KAFKA_CFG_PROCESS_ROLES
          value: controller,broker
        - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
          value: 0@kafka-inst-0:9093,1@kafka-inst-1:9093,2@kafka-inst-2:9093
        - name: KAFKA_CFG_LISTENERS
          value: PLAINTEXT://:9092,CONTROLLER://:9093,INTERNAL://:9094
        - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
          value: CONTROLLER
        - name: KAFKA_HEAP_OPTS
          value: -Xmx1G -Xms1G
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_CFG_MIN_INSYNC_REPLICAS
          value: "2"
        - name: KAFKA_CFG_DEFAULT_REPLICATION_FACTOR
          value: "3"
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: KAFKA_CFG_ADVERTISED_LISTENERS
          value: INTERNAL://$(MY_POD_NAME).test.svc.cluster.local:9094,PLAINTEXT://$(MY_POD_NAME).test.svc.cluster.local:9092
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: INTERNAL
        - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
          value: INTERNAL:SSL,CONTROLLER:SSL,PLAINTEXT:PLAINTEXT
        - name: KAFKA_TLS_TYPE
          value: PEM
        - name: KAFKA_CFG_INITIAL_BROKER_REGISTRATION_TIMEOUT_MS
          value: "240000"
        image: bitnami/kafka:3.3.2-debian-11-r49
        imagePullPolicy: IfNotPresent
...

Same problem I have with different Kafka version (3.3.2, 3.4.1) or image release. Cluster works fine, until I add replicas.

Have please somebody any clue, what is wrong? Or some hints will be valuable as well.

Many thanks!

Edit: Making SSL certificates

I taked inspiration from the blog and adapted scripts for me. Self signed CA cert I assemble in the helm chart via function genCA, if secret with CAcert is missing. If exist, let it as is. This is enough for development purpose. Later, especial in the production, I add handling liveness of the CAcert via CertManager.

CA cert and key are available only for init script. Init make preparation (inspired from the scripts above) for the broker cert. For pod with broker are available only CA cert, CA key is ignored, and of course broker cert and broker key.

As a broker cert store is used PEM file and corresponding KEY file. As a broker trust store is used CAcert PEM file. In statefullset it looks like (based on the Bitnami documentation):

volumeMounts:
- name: cacert
  mountPath: /opt/bitnami/kafka/config/certs/kafka.truststore.pem
  subPath: tls.crt
- name: storage
  mountPath: /opt/bitnami/kafka/config/certs/kafka.keystore.pem
  subPath: certs/kafka.keystore.pem
- name: storage
  mountPath: /opt/bitnami/kafka/config/certs/kafka.keystore.key
  subPath: certs/kafka.keystore.key

I think, that this approach working fine, because when I add -Djavax.net.debug=all parameter, in the logs from brokers I can see, that SSL communication is used without problems.

Same situation, when I made just empty topic with 3 partitions. All brokers made it on their own storage.

If the certs or approach should be bad, this isnt working, I think.

Edit: Usage

Kafka cluster is used in our helm chart as a loadbalancer for messages. Many times we get huge amount of messages in short time period, but by default the flow is stable. Sometimes we need to restart consumer application, so, we using Kafka as a cache in the case. Deploying operator to manage Kafka making new dependency on the helm chart and in the cluster by default exist only one Kafka cluster. Using operator have sense, if we need to have many Kafka cluster installations in the Kubernetes cluster.

Edit: Plaintext listeners

This is just first step. In the step I attempt to secure communication only between brokers. So, producers and consumers stay as is. When I finished this step, of course, next step will be secure communication between Kafka cluster and the applications one by one. When no application without security not exist, parameter ALLOW_PLAINTEXT_LISTENER=yes will be removed.

Upvotes: 0

Views: 731

Answers (1)

Adavan
Adavan

Reputation: 63

Thank you for your effort, I solved the problem. Was on typically place - between chair and keyboard :). I focused to Kafka too much :(.

In the step I introduce new listener (INTERNAL://:9094), but forgot add it to the Kubernetes service object. Looks like:

spec:
  ports:
    - port: 9092
      name: listener
    - port: 9093
      name: controller
    - port: 9094
      name: internal

Last two lines solved the problem :).

Upvotes: 0

Related Questions