Nicola Ben
Nicola Ben

Reputation: 11337

readinessProbe (k8s) for kafka statefulset causes bad deployment

I have a kubernetes cluster (v 1.9.0) in which I deployed 3 zookeeper pods (working correctly) and I want to have 3 kafka replicas.

The following statefulset works only if I comment the readinessProbe section.

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  serviceName: kafka
  replicas: 3
  template:
    metadata:
      labels:
        app: kafka
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - kafka
              topologyKey: "kubernetes.io/hostname"
        podAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
             - weight: 1
               podAffinityTerm:
                 labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values:
                        - zookeeper
                 topologyKey: "kubernetes.io/hostname"
      terminationGracePeriodSeconds: 30
      containers:
      - name: kafka
        imagePullPolicy: IfNotPresent
        image: sorintdev/kafka:20171204a
        resources:
          requests:
            memory: 500Mi
            cpu: 200m
        ports:
        - containerPort: 9092
          name: server
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9092 \
          --override zookeeper.connect=zookeeper-0.zookeeper:2181,zookeeper-1.zookeeper:2181,zookeeper-2.zookeeper:2181 \
          --override log.dirs=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override broker.id.generation.enable=true \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.message.format.version=1.0 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=1 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=1.0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override reserved.broker.max.id=1000 \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false "
        env:
        - name: KAFKA_HEAP_OPTS
          value: "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=WARN"
        - name: ZOOKEEPER_CONNECT
          value: "zookeeper"
        volumeMounts:
        - name: data
          mountPath: /var/lib/kafka
        #
        # ATTENTION: readinessProbe causes the statefulset not to deploy correctly, don't use it!
        #
        readinessProbe:
          initialDelaySeconds: 10
          timeoutSeconds: 5
          exec:
           command:
            - sh
            - -c
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"
        livenessProbe:
          initialDelaySeconds: 10
          timeoutSeconds: 5
          exec:
           command:
            - sh
            - -c
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"
      securityContext:
        runAsUser: 1000
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      resources:
        requests:
          storage: 250Mi
      accessModes:
      - ReadWriteOnce
      storageClassName: nfs

With readinessProbe I get only 2 of 3 replicas, but only one is running:

NAME                        READY     STATUS    RESTARTS   AGE
kafka-0                     1/1       Running   0          40s
kafka-1                     0/1       Running   0          20s

kafka-1 logs are:

[2017-12-21 11:23:24,964] INFO Starting the log cleaner (kafka.log.LogCleaner)
[2017-12-21 11:23:24,966] INFO [kafka-log-cleaner-thread-0]: Starting (kafka.log.LogCleaner)
[2017-12-21 11:23:25,394] INFO [controller-event-thread]: Starting (kafka.controller.ControllerEventManager$ControllerEventThread)
[2017-12-21 11:23:25,403] DEBUG [Controller id=1] Broker 0 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController)

And kafka-0 ones are:

[2017-12-21 11:25:48,184] WARN [Controller-0-to-broker-1-send-thread]: Controller 0's connection to broker kafka-1.kafka.default.svc.cluster.local:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to kafka-1.kafka.default.svc.cluster.local:9092 (id: 1 rack: null) failed.
        at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
        at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:269)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:223)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)

Producing messages on kafka-0 works, but consuming doesn't.

If I remove readinessProbe, all 3 replicas are created and kafka works like a charm. It seems that readinessProbe causes the election process not completing correctly. I can't figure out why the third replica is not created. Any suggestions are very very welcome.

Upvotes: 8

Views: 6473

Answers (2)

Vineeth
Vineeth

Reputation: 1032

We have built following "startupProbe" for our Kafka setup in Kubernetes which solved mostly all our problems.

startupProbe:
exec:
  command:
    - sh
    - '-c'
    - >
      [[ "$(kafka-broker-api-versions --bootstrap-server
      localhost:9092 | grep 9092 | wc -l)" == "3" ]] || { echo >&2
      "Broker count is not 3"; exit 1; } && [[ "$(curl -s
      kafka-controller-headless.my-namespace:9102/metrics
      | grep
      kafka_controller_kafkacontroller_preferredreplicaimbalancecount
      | tail -n 1 | grep -o '[^ ]\+$')" == "0.0" ]] || { echo >&2
      "Replica Imbalance not 0"; exit 1; }

And along with that, we have simple tcp port check for liveness and cli check for the readiness probe, which you see in most of the answers.

Here the "kafka-controller-headless.my-namespace" is the kubernetes service name of the "controller" (KRAFT)

What this does is that:

  • It will first check whether the broker count is matching (which is 3 for our setup)
  • It then also check for the "replica imbalance" to be zero - this prevents the brokers to not go down till the cluster is in balance

This way you can also offload lot of overhead from the readiness probe which runs throughout the deployment lifecycle.

Upvotes: 0

Himansu
Himansu

Reputation: 328

I run into the exact similar issue for kafka. I tried with following changes, this resolved the issue. Maybe this would help. Readiness - instead of executing script, i tried with tcpSocket & livenessprobe done by command.

You can update initialDelaySeconds as per your requirement though.

    readinessProbe:
          tcpSocket:
             port: 9092
          timeoutSeconds: 5
          periodSeconds: 5
          initialDelaySeconds: 40
    livenessProbe:
          exec:
            command:
            - sh
            - -c
            - "kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
          timeoutSeconds: 5
          periodSeconds: 5
          initialDelaySeconds: 70


$ kubectl get pods -w -l app=kafka
NAME      READY   STATUS    RESTARTS   AGE
kafka-0   1/1     Running   0          3m
kafka-1   1/1     Running   0          2m
kafka-2   1/1     Running   0          1m

Upvotes: 4

Related Questions