Reputation: 25
I have a task to create an application with kafka in kubernetes. But when i connect consumer to kafka i got error:
WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-1, groupId=test] Connection to node -1 (kafka-service/10.99.233.131:9092) could not be established. Broker may not be available.
WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-1, groupId=test] Bootstrap broker kafka-service:9092 (id: -1 rack: null) disconnected
Here is my yaml file for kubernetes:
apiVersion: v1
kind: Service
metadata:
name: kafka-service
labels:
name: kafka
spec:
ports:
- port: 9092
targetPort: 9092
protocol: TCP
selector:
name: kafka
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper-service
labels:
name: zookeeper
spec:
ports:
- name: client
port: 2181
protocol: TCP
- name: follower
port: 2888
protocol: TCP
- name: leader
port: 3888
protocol: TCP
selector:
name: zookeeper
type: LoadBalancer
---
apiVersion: v1
kind: Pod
metadata:
name: zookeeper
labels:
name: zookeeper
spec:
containers:
- name: zookeeper
image: zookeeper:3.7.0
---
apiVersion: v1
kind: Pod
metadata:
name: kafka
labels:
name: kafka
spec:
containers:
- name: kafka
image: wurstmeister/kafka:2.13-2.6.0
imagePullPolicy: "IfNotPresent"
env:
- name: KAFKA_ADVERTISED_PORT
value: "666"
- name: KAFKA_ADVERTISED_HOST_NAME
value: localhost
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: INSIDE:PLAINTEXT
- name: KAFKA_ADVERTISED_LISTENERS
value: INSIDE://:666
- name: KAFKA_LISTENERS
value: INSIDE://:666
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INSIDE
ports:
- containerPort: 9092
Simple java code:
@Service
public class Consumer {
@KafkaListener(topics = "new-topic",groupId = "test")
public void consumeMessage(String message){
System.out.println("************************");
System.out.println(message);
System.out.println("************************");
}
}
How to create kafka broker without scaling just for tests?
Upvotes: 0
Views: 1138
Reputation: 25
For me very helpful was this post: https://www.confluent.io/blog/kafka-listeners-explained/
I am changed yaml file to this:
apiVersion: v1
kind: Pod
metadata:
name: kafka-service
labels:
name: kafka-service
spec:
containers:
- name: kafka-service
image: wurstmeister/kafka:2.13-2.6.0
imagePullPolicy: "IfNotPresent"
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_LISTENERS
value: IN://:9092,OUT://:9093
- name: KAFKA_ADVERTISED_LISTENERS
value: IN://localhost:9092,OUT://kafka-service:9093
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: IN:PLAINTEXT,OUT:PLAINTEXT
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: IN
This means that port 9093 оpen inside kubernetes network and port 9092 open from host machine. After that all work correctly.
Upvotes: 0