Faaiz
Faaiz

Reputation: 685

Kafka Mongo on kubernetes(minikube) (Mongo as source to Kafka NOT Working) with bitnami/kafka

Which chart: charts/bitnami/kafka/ https://github.com/bitnami/charts/tree/master/bitnami/kafka

Describe the bug I am following the tutorial Build a Scalable, Fault-Tolerant Messaging Cluster on Kubernetes with Apache Kafka and MongoDB

To resolve the extraDeploy issue i followed extraDeploy doesn't render with example in documentation #5649. The issue is resolved and i have working configuration as below:

  1. Dockerfile
FROM bitnami/kafka:latest

RUN mkdir -p /opt/bitnami/kafka/plugins && \
    cd /opt/bitnami/kafka/plugins && \
    curl --remote-name --location --silent https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.6.1/mongo-kafka-connect-1.6.1-all.jar
  1. Values.yml
extraDeploy:
  - |
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: {{ include "kafka.fullname" . }}-connect
      labels: {{- include "common.labels.standard" . | nindent 4 }}
        app.kubernetes.io/component: connector
    spec:
      replicas: 1
      selector:
        matchLabels: {{- include "common.labels.matchLabels" . | nindent 6 }}
          app.kubernetes.io/component: connector
      template:
        metadata:
          labels: {{- include "common.labels.standard" . | nindent 8 }}
            app.kubernetes.io/component: connector
        spec:
          containers:
            - name: connect
              image: kafka-connect-bitnami:5.22
              imagePullPolicy: Never
              command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-sink.properties"]
              ports:
                - name: connector
                  containerPort: 8083
              volumeMounts:
                - name: configuration
                  mountPath: /config
          volumes:
            - name: configuration
              configMap:
                name: {{ include "kafka.fullname" . }}-connect
  - |
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: {{ include "kafka.fullname" . }}-connect
      labels: {{- include "common.labels.standard" . | nindent 4 }}
        app.kubernetes.io/component: connector
    data:
      connect-standalone.properties: |-
        bootstrap.servers={{ include "kafka.fullname" . }}.{{ .Release.Namespace }}.svc.{{ .Values.clusterDomain }}:{{ .Values.service.port }}
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=true
        value.converter.schemas.enable=true
        offset.storage.file.filename=/tmp/connect.offsets
        offset.flush.interval.ms=20000
        plugin.path=/opt/bitnami/kafka/plugins
      mongodb-source.properties: |-
        connection.uri=mongodb://user:[email protected]:27017/mydb
        name=mongo-source-connector
        topics=source-topic
        connector.class=com.mongodb.kafka.connect.MongoSourceConnector
        tasks.max=1
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=false
        value.converter.schemas.enable=false
        database=mydb
        collection=source
        batch.size=0
        change.stream.full.document=updateLookup
        pipeline=[]
        collation=
      mongodb-sink.properties: |-
        connection.uri=mongodb://user:[email protected]:27017/mydb
        name=mongo-sink-connector
        topics=sink-topic
        connector.class=com.mongodb.kafka.connect.MongoSinkConnector
        tasks.max=1
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=false
        value.converter.schemas.enable=false
        database=mydb
        collection=sink
  - |
    apiVersion: v1
    kind: Service    
    metadata:
      name: {{ include "kafka.fullname" . }}-connect
      labels: {{- include "common.labels.standard" . | nindent 4 }}
        app.kubernetes.io/component: connector
    spec:
      ports:
        - protocol: TCP
          port: 8083
          targetPort: connector
      selector: {{- include "common.labels.matchLabels" . | nindent 4 }}
        app.kubernetes.io/component: connector
  1. I have deployed the mongodb with replica using following tutorial [Horizontally Scale Your MongoDB Deployment with Bitnami, Kubernetes and Helm] (https://engineering.bitnami.com/articles/horizontally-scale-your-mongodb-deployment-with-bitnami-kubernetes-and-helm.html)

and then i run

helm install kafka bitnami/kafka -f values.yml

I have following setup up and running for mongodb-sink-connector:

kafka-mongo-setup

NAME                                READY   STATUS    RESTARTS      AGE
pod/kafka-0                         1/1     Running   1 (35h ago)   35h
pod/kafka-client                    1/1     Running   0             43h
pod/kafka-connect-669487944-gb4p7   1/1     Running   0             57m
pod/kafka-zookeeper-0               1/1     Running   0             35h
pod/mongodb-arbiter-0               1/1     Running   0             42h
pod/mongodb-client                  1/1     Running   0             41h
pod/mongodb-primary-0               2/2     Running   0             42h
pod/mongodb-secondary-0             2/2     Running   0             42h

NAME                               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
service/kafka                      ClusterIP   10.98.236.199   <none>        9092/TCP                     35h
service/kafka-connect              ClusterIP   10.105.58.215   <none>        8083/TCP                     35h
service/kafka-headless             ClusterIP   None            <none>        9092/TCP,9093/TCP            35h
service/kafka-zookeeper            ClusterIP   10.108.22.188   <none>        2181/TCP,2888/TCP,3888/TCP   35h
service/kafka-zookeeper-headless   ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP   35h
service/kubernetes                 ClusterIP   10.96.0.1       <none>        443/TCP                      14d
service/mongodb                    ClusterIP   10.96.105.153   <none>        27017/TCP,9216/TCP           42h
service/mongodb-headless           ClusterIP   None            <none>        27017/TCP                    42h

NAME                            READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/kafka-connect   1/1     1            1           35h

NAME                                       DESIRED   CURRENT   READY   AGE
replicaset.apps/kafka-connect-54cff6f879   0         0         0       35h
replicaset.apps/kafka-connect-59fcf7754c   0         0         0       24h
replicaset.apps/kafka-connect-64c5697f54   0         0         0       21h
replicaset.apps/kafka-connect-669487944    1         1         1       21h
replicaset.apps/kafka-connect-66c6dd4679   0         0         0       35h
replicaset.apps/kafka-connect-84ffbffd5c   0         0         0       23h

NAME                                 READY   AGE
statefulset.apps/kafka               1/1     35h
statefulset.apps/kafka-zookeeper     1/1     35h
statefulset.apps/mongodb-arbiter     1/1     42h
statefulset.apps/mongodb-primary     1/1     42h
statefulset.apps/mongodb-secondary   1/1     42h

I can publish the messages to mongodb successfully with above configuration. Below are some message after successful sink on kafaka-connect pod.

[2022-01-19 11:52:25,476] INFO [mongo-sink-connector|task-0] [Consumer clientId=connector-consumer-mongo-sink-connector-0, groupId=connect-mongo-sink-connector] Setting offset for partition sink-topic-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka-0.kafka-headless.default.svc.cluster.local:9092 (id: 0 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:818)
[2022-01-19 11:52:25,519] INFO [mongo-sink-connector|task-0] Cluster created with settings {hosts=[mongodb.default.svc.cluster.local:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:5, serverValue:7744}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:4, serverValue:7743}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,523] INFO [mongo-sink-connector|task-0] Monitor thread successfully connected to server with description ServerDescription{address=mongodb.default.svc.cluster.local:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2650238, setName='rs0', canonicalAddress=mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, hosts=[mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, mongodb-secondary-0.mongodb-headless.default.svc.cluster.local:27017], passives=[], arbiters=[mongodb-arbiter-0.mongodb-headless.default.svc.cluster.local:27017], primary='mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000002, setVersion=3, topologyVersion=null, lastWriteDate=Wed Jan 19 11:52:24 UTC 2022, lastUpdateTimeNanos=188123424943131} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,566] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:6, serverValue:7745}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)

To Reproduce

From producer:

kubectl exec --tty -i kafka-client --namespace default -- kafka-console-producer.sh --broker-list kafka-0.kafka-headless.default.svc.cluster.local:9092 --topic sink-topic  

>{"foo":"bar.....12"}  
>{"foo":"bar.....1122"}
>

In mongodb:

rs0:PRIMARY> use mydb;
rs0:PRIMARY> db.sink.find()
{ "_id" : ObjectId("61e7fb793bb99a00505efa14"), "foo" : "bar.....12" }
{ "_id" : ObjectId("61e7fb9f3bb99a00505efa16"), "foo" : "bar.....1122" }
rs0:PRIMARY> 

Issue ? The problem is that this chart is working only for mongo-sink. I can not able to use mongo-source-connector to use mongodb as source with above configuration values.yml. Note, for using the mongo-source-connector, i do following change in values.yml file.

command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-source.properties"]

Expected behavior

(i)- Kafka Connect is working with Mongo as SINK, meaning kafka-->mongo (OK).

(ii) But, it's not working as SOURCE, meaning mongo-->kafka (NOT OK).

There is NO error in pods or any of error messages that tells kafka can not communicate with mongo.

My Questions:

  1. If i am missing any necessary settings, what are those to achieve (ii).
  2. Is it possible to run source-connector and sink-connector simulatenously to receieve the updates in kafka and to send the data from kafka. If yes, what the necessary update that are to done to accomplish this.

Thanks in advance for your feedback

Upvotes: 0

Views: 334

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191844

The problem is that this chart is working only for mongo-sink

Because that is the only connector file you gave.

You can give multiple

command: 
  - /bin/bash
  - -c
  - /opt/bitnami/kafka/bin/connect-standalone.sh 
  - /config/connect-standalone.properties 
  - /config/mongodb-source.properties
  - /config/mongodb-sink.properties

Note: This is not fault-tolernant or to be used in production. Instead, your command should really use connect-distrubuted.sh, and your connector configs can be saved as JSON files and POST-ed to the service ingress port.

Upvotes: 1

Related Questions