Reputation: 685
Which chart: charts/bitnami/kafka/ https://github.com/bitnami/charts/tree/master/bitnami/kafka
Describe the bug I am following the tutorial Build a Scalable, Fault-Tolerant Messaging Cluster on Kubernetes with Apache Kafka and MongoDB
To resolve the extraDeploy issue i followed extraDeploy doesn't render with example in documentation #5649. The issue is resolved and i have working configuration as below:
FROM bitnami/kafka:latest
RUN mkdir -p /opt/bitnami/kafka/plugins && \
cd /opt/bitnami/kafka/plugins && \
curl --remote-name --location --silent https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.6.1/mongo-kafka-connect-1.6.1-all.jar
extraDeploy:
- |
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "kafka.fullname" . }}-connect
labels: {{- include "common.labels.standard" . | nindent 4 }}
app.kubernetes.io/component: connector
spec:
replicas: 1
selector:
matchLabels: {{- include "common.labels.matchLabels" . | nindent 6 }}
app.kubernetes.io/component: connector
template:
metadata:
labels: {{- include "common.labels.standard" . | nindent 8 }}
app.kubernetes.io/component: connector
spec:
containers:
- name: connect
image: kafka-connect-bitnami:5.22
imagePullPolicy: Never
command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-sink.properties"]
ports:
- name: connector
containerPort: 8083
volumeMounts:
- name: configuration
mountPath: /config
volumes:
- name: configuration
configMap:
name: {{ include "kafka.fullname" . }}-connect
- |
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "kafka.fullname" . }}-connect
labels: {{- include "common.labels.standard" . | nindent 4 }}
app.kubernetes.io/component: connector
data:
connect-standalone.properties: |-
bootstrap.servers={{ include "kafka.fullname" . }}.{{ .Release.Namespace }}.svc.{{ .Values.clusterDomain }}:{{ .Values.service.port }}
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=20000
plugin.path=/opt/bitnami/kafka/plugins
mongodb-source.properties: |-
connection.uri=mongodb://user:[email protected]:27017/mydb
name=mongo-source-connector
topics=source-topic
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
database=mydb
collection=source
batch.size=0
change.stream.full.document=updateLookup
pipeline=[]
collation=
mongodb-sink.properties: |-
connection.uri=mongodb://user:[email protected]:27017/mydb
name=mongo-sink-connector
topics=sink-topic
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
database=mydb
collection=sink
- |
apiVersion: v1
kind: Service
metadata:
name: {{ include "kafka.fullname" . }}-connect
labels: {{- include "common.labels.standard" . | nindent 4 }}
app.kubernetes.io/component: connector
spec:
ports:
- protocol: TCP
port: 8083
targetPort: connector
selector: {{- include "common.labels.matchLabels" . | nindent 4 }}
app.kubernetes.io/component: connector
and then i run
helm install kafka bitnami/kafka -f values.yml
I have following setup up and running for mongodb-sink-connector:
NAME READY STATUS RESTARTS AGE
pod/kafka-0 1/1 Running 1 (35h ago) 35h
pod/kafka-client 1/1 Running 0 43h
pod/kafka-connect-669487944-gb4p7 1/1 Running 0 57m
pod/kafka-zookeeper-0 1/1 Running 0 35h
pod/mongodb-arbiter-0 1/1 Running 0 42h
pod/mongodb-client 1/1 Running 0 41h
pod/mongodb-primary-0 2/2 Running 0 42h
pod/mongodb-secondary-0 2/2 Running 0 42h
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kafka ClusterIP 10.98.236.199 <none> 9092/TCP 35h
service/kafka-connect ClusterIP 10.105.58.215 <none> 8083/TCP 35h
service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 35h
service/kafka-zookeeper ClusterIP 10.108.22.188 <none> 2181/TCP,2888/TCP,3888/TCP 35h
service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35h
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 14d
service/mongodb ClusterIP 10.96.105.153 <none> 27017/TCP,9216/TCP 42h
service/mongodb-headless ClusterIP None <none> 27017/TCP 42h
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/kafka-connect 1/1 1 1 35h
NAME DESIRED CURRENT READY AGE
replicaset.apps/kafka-connect-54cff6f879 0 0 0 35h
replicaset.apps/kafka-connect-59fcf7754c 0 0 0 24h
replicaset.apps/kafka-connect-64c5697f54 0 0 0 21h
replicaset.apps/kafka-connect-669487944 1 1 1 21h
replicaset.apps/kafka-connect-66c6dd4679 0 0 0 35h
replicaset.apps/kafka-connect-84ffbffd5c 0 0 0 23h
NAME READY AGE
statefulset.apps/kafka 1/1 35h
statefulset.apps/kafka-zookeeper 1/1 35h
statefulset.apps/mongodb-arbiter 1/1 42h
statefulset.apps/mongodb-primary 1/1 42h
statefulset.apps/mongodb-secondary 1/1 42h
I can publish the messages to mongodb successfully with above configuration. Below are some message after successful sink on kafaka-connect pod.
[2022-01-19 11:52:25,476] INFO [mongo-sink-connector|task-0] [Consumer clientId=connector-consumer-mongo-sink-connector-0, groupId=connect-mongo-sink-connector] Setting offset for partition sink-topic-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka-0.kafka-headless.default.svc.cluster.local:9092 (id: 0 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:818)
[2022-01-19 11:52:25,519] INFO [mongo-sink-connector|task-0] Cluster created with settings {hosts=[mongodb.default.svc.cluster.local:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:5, serverValue:7744}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:4, serverValue:7743}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,523] INFO [mongo-sink-connector|task-0] Monitor thread successfully connected to server with description ServerDescription{address=mongodb.default.svc.cluster.local:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2650238, setName='rs0', canonicalAddress=mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, hosts=[mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, mongodb-secondary-0.mongodb-headless.default.svc.cluster.local:27017], passives=[], arbiters=[mongodb-arbiter-0.mongodb-headless.default.svc.cluster.local:27017], primary='mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000002, setVersion=3, topologyVersion=null, lastWriteDate=Wed Jan 19 11:52:24 UTC 2022, lastUpdateTimeNanos=188123424943131} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,566] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:6, serverValue:7745}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
To Reproduce
From producer:
kubectl exec --tty -i kafka-client --namespace default -- kafka-console-producer.sh --broker-list kafka-0.kafka-headless.default.svc.cluster.local:9092 --topic sink-topic
>{"foo":"bar.....12"}
>{"foo":"bar.....1122"}
>
In mongodb:
rs0:PRIMARY> use mydb;
rs0:PRIMARY> db.sink.find()
{ "_id" : ObjectId("61e7fb793bb99a00505efa14"), "foo" : "bar.....12" }
{ "_id" : ObjectId("61e7fb9f3bb99a00505efa16"), "foo" : "bar.....1122" }
rs0:PRIMARY>
Issue ?
The problem is that this chart is working only for mongo-sink. I can not able to use mongo-source-connector to use mongodb as source with above configuration values.yml
. Note, for using the mongo-source-connector, i do following change in values.yml
file.
command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-source.properties"]
Expected behavior
(i)- Kafka Connect is working with Mongo as SINK, meaning kafka-->mongo (OK).
(ii) But, it's not working as SOURCE, meaning mongo-->kafka (NOT OK).
There is NO error in pods or any of error messages that tells kafka can not communicate with mongo.
My Questions:
Thanks in advance for your feedback
Upvotes: 0
Views: 334
Reputation: 191844
The problem is that this chart is working only for mongo-sink
Because that is the only connector file you gave.
You can give multiple
command:
- /bin/bash
- -c
- /opt/bitnami/kafka/bin/connect-standalone.sh
- /config/connect-standalone.properties
- /config/mongodb-source.properties
- /config/mongodb-sink.properties
Note: This is not fault-tolernant or to be used in production. Instead, your command should really use connect-distrubuted.sh
, and your connector configs can be saved as JSON files and POST-ed to the service ingress port.
Upvotes: 1