Reputation: 1709
I have multiple services in Docker, including service A (consumer) and Lafka. Service A is the consumer. Below is the docker-compose.yaml
version: '3.8'
services:
service-a:
container_name: service-a
build:
context: .
dockerfile: Dockerfile
ports:
- "8881:8881"
environment:
- ENVIRONMENT=development
depends_on:
- kafka
- chroma
kafka:
container_name: my-kafka
image: bitnami/kafka:latest
ports:
- "9094:9094"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@:9093
- ALLOW_PLAINTEXT_LISTENER=yes
chroma:
container_name: learnsuite-chroma
image: chromadb/chroma:latest
ports:
- "8882:8000"
Note that the Kafka bootstrap_servers setting for my service-a (running in a docker container) is
bootstrap_servers: kafka:9092
And this setting will be used in the consumer creation in service-a:
def get_consumer_for_topic(topic: str):
group_id = settings.kafka.group
bootstrap = settings.kafka.bootstrap_servers
bootstrap_servers = bootstrap.split(',') if bootstrap else []
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
topics = admin_client.list_topics()
if topic not in topics:
# Create the topic if it doesn't exist
new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
admin_client.create_topics([new_topic])
logger.info(f"Topic '{topic}' created.")
else:
logger.info(f"Topic '{topic}' already exists.")
except Exception as e:
logger.error(f"Error ensuring topic exists: {e}")
finally:
admin_client.close()
return KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id = group_id,
auto_offset_reset='earliest'
)
If I run docker-compose up
, I got error
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
I know many people talks about it, but this is where I got even after googled around. Please advise.
Answer: As the question had been marked deletion, it seems I cannot answer it, so update here for the people who might have similar issue.
The thing is the service-a has a really fast boot process. It hit the topic creation part before kafka fully booted. Therefore, I added a retry mechnism to basically let it wait. After that, from the log, I can tell it failed a the first time but then picked up at the 2nd time and eventually boot successfully.
Upvotes: 0
Views: 61