Reputation: 140
I'm trying to setup a kafka cluster with 3 brokers on Docker.
The problem is: when I do an operation (i.e create/list/delete topics), there's always 1 broker fails to be connected and restart Docker container. This problem doesn't happen on a cluster of 2 or single broker.
My steps to reproduce is:
kafka-topics --bootstrap-server ":9092" --create --topic topic-name --partitions 3 --replication-factor 3
I'm new to Kafka. I think I'm just having some silly mistakes but I don't have any clue of what it is. I search through docs but haven't found yet.
Here is my docker-compose file:
version: "3.9"
networks:
kafka-cluster:
driver: bridge
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
# ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
# ZOOKEEPER_TICK_TIME: 2000
# ZOOKEEPER_SERVERS: "zookeeper:22888:23888"
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
ports:
- 2181:2181
restart: unless-stopped
networks:
- kafka-cluster
kafka1:
image: confluentinc/cp-kafka:latest
container_name: kafka1
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9093
KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka1:9092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
restart: unless-stopped
networks:
- kafka-cluster
kafka2:
image: confluentinc/cp-kafka:latest
container_name: kafka2
depends_on:
- zookeeper
ports:
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9094
KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka2:9092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
restart: unless-stopped
networks:
- kafka-cluster
kafka3:
image: confluentinc/cp-kafka:latest
container_name: kafka3
depends_on:
- zookeeper
ports:
- "9095:9095"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9095
KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka3:9092,EXTERNAL://localhost:9095
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
restart: unless-stopped
networks:
- kafka-cluster
kafdrop:
image: obsidiandynamics/kafdrop:latest
container_name: kafdrop
ports:
- "9000:9000"
environment:
- KAFKA_BROKERCONNECT=kafka1:9092,kafka2:9092,kafka3:9092
- JVM_OPTS="-Xms32M -Xmx64M"
- SERVER_SERVLET_CONTEXTPATH="/"
depends_on:
- kafka1
networks:
- kafka-cluster
Here is the error log on the other 2 brokers:
[2022-01-17 04:32:40,078] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1002, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test-topic-3-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), test-topic-2-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=28449961, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
at scala.Option.foreach(Option.scala:437)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2022-01-17 04:32:42,088] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Connection to node 1001 (kafka1/192.168.48.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-01-17 04:32:42,088] INFO [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error sending fetch request (sessionId=28449961, epoch=INITIAL) to node 1001: (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
at scala.Option.foreach(Option.scala:437)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
Upvotes: 0
Views: 1250
Reputation: 191681
Assuming you don't need host connections (since you're running the Kafka CLI commands directly in the containers), you could greatly simplify your Compose file
CLIENT
listeners, and stick to the defaults.All in all, you'd end up with something like this
x-kafka-setup: &kafka-setup
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: 'yes'
version: "3.8"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka1:
image: &broker-image docker.io/bitnami/kafka:3
environment:
KAFKA_BROKER_ID: 1
<<: *kafka-setup
depends_on:
- zookeeper
kafka2:
image: *broker-image
environment:
KAFKA_BROKER_ID: 2
<<: *kafka-setup
depends_on:
- zookeeper
kafka3:
image: *broker-image
environment:
KAFKA_BROKER_ID: 3
<<: *kafka-setup
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop:latest
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: kafka1:9092,kafka2:9092,kafka3:9092
JVM_OPTS: "-Xms32M -Xmx64M"
SERVER_SERVLET_CONTEXTPATH: /
depends_on:
- kafka1
- kafka2
- kafka3
Upvotes: 1