Nguyen DN
Nguyen DN

Reputation: 140

kafka broker connection failed when create new topic with a cluster of 3 brokers

I'm trying to setup a kafka cluster with 3 brokers on Docker.

The problem is: when I do an operation (i.e create/list/delete topics), there's always 1 broker fails to be connected and restart Docker container. This problem doesn't happen on a cluster of 2 or single broker.

My steps to reproduce is:

I'm new to Kafka. I think I'm just having some silly mistakes but I don't have any clue of what it is. I search through docs but haven't found yet.

Here is my docker-compose file:

version: "3.9"

networks:
  kafka-cluster:
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      # ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      # ZOOKEEPER_TICK_TIME: 2000
      # ZOOKEEPER_SERVERS: "zookeeper:22888:23888"
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
    ports:
      - 2181:2181
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka1:
    image: confluentinc/cp-kafka:latest
    container_name: kafka1
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9093
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka1:9092,EXTERNAL://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka2:
    image: confluentinc/cp-kafka:latest
    container_name: kafka2
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9094
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka2:9092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka3:
    image: confluentinc/cp-kafka:latest
    container_name: kafka3
    depends_on:
      - zookeeper
    ports:
      - "9095:9095"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9095
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka3:9092,EXTERNAL://localhost:9095
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    ports:
      - "9000:9000"
    environment:
      - KAFKA_BROKERCONNECT=kafka1:9092,kafka2:9092,kafka3:9092
      - JVM_OPTS="-Xms32M -Xmx64M"
      - SERVER_SERVLET_CONTEXTPATH="/"
    depends_on:
      - kafka1
    networks:
      - kafka-cluster

Here is the error log on the other 2 brokers:

[2022-01-17 04:32:40,078] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1002, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test-topic-3-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), test-topic-2-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=28449961, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
    at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
    at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2022-01-17 04:32:42,088] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Connection to node 1001 (kafka1/192.168.48.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-01-17 04:32:42,088] INFO [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error sending fetch request (sessionId=28449961, epoch=INITIAL) to node 1001: (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
    at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
    at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

Upvotes: 0

Views: 1250

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191681

Assuming you don't need host connections (since you're running the Kafka CLI commands directly in the containers), you could greatly simplify your Compose file

  1. Remove host ports
  2. Remove non-CLIENT listeners, and stick to the defaults.
  3. Remove the Compose network (for debugging) since one is automatically created

All in all, you'd end up with something like this

x-kafka-setup: &kafka-setup
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  ALLOW_PLAINTEXT_LISTENER: 'yes'

version: "3.8"
services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.7
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka1:
    image: &broker-image docker.io/bitnami/kafka:3
    environment:
      KAFKA_BROKER_ID: 1
      <<: *kafka-setup
    depends_on:
      - zookeeper
  kafka2:
    image: *broker-image
    environment:
      KAFKA_BROKER_ID: 2
      <<: *kafka-setup
    depends_on:
      - zookeeper
  kafka3:
    image: *broker-image
    environment:
      KAFKA_BROKER_ID: 3
      <<: *kafka-setup
    depends_on:
      - zookeeper

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: kafka1:9092,kafka2:9092,kafka3:9092
      JVM_OPTS: "-Xms32M -Xmx64M"
      SERVER_SERVLET_CONTEXTPATH: /
    depends_on:
      - kafka1
      - kafka2
      - kafka3

enter image description here

Upvotes: 1

Related Questions