chirag
chirag

Reputation: 57

Atlas MongoDB sink connector for strimzi kafka setup

I am new kafka space and I have setup Strimzi cluster operator, Kafka bootstrap server, entity operator, and kafka connect in Kubernetes following the below guidelines:

https://strimzi.io/docs/operators/latest/deploying.html

How do I setup kafka mongo sink connector for strimzi kafka connect cluster ?

I have the official mongodb connector plugin. Can I use this plugin to connect to atlas mongodb ?

Most of the forums have explanation on confluent kafka but not strimzi kafka.

Below is my kafka connect config:

apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaConnect

metadata:

  name: my-mongo-connect

  annotations:

    strimzi.io/use-connector-resources: "true"

spec:

  image: STRIMZI KAFKA CONNECT IMAGE WITH MONGODB PLUGIN

  version: 3.2.1

  replicas: 1

  bootstrapServers:  my-cluster-kafka-bootstrap:9092

  logging:

    type: inline

    loggers:

      connect.root.logger.level: "INFO"

  config:

    group.id:  my-cluster

    offset.storage.topic: mongo-connect-cluster-offsets

    config.storage.topic: mongo-connect-cluster-configs

    status.storage.topic: mongo-connect-cluster-status

    key.converter: org.apache.kafka.connect.json.JsonConverter

    value.converter: org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable: true

    value.converter.schemas.enable: true

    config.storage.replication.factor: -1

    offset.storage.replication.factor: -1

    status.storage.replication.factor: -1

Below is my sink connector config:

apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaConnector

metadata:

  name: mongodb-sink-connector

  labels:

    strimzi.io/cluster: my-cluster

spec:

  class: com.mongodb.kafka.connect.MongoSinkConnector

  tasksMax: 2

  config:

    topics: my-topic

    connection.uri: "MONGO ATLAS CONNECTION STRING"

    database: my_database

    collection: my_collection

    post.processor.chain: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder

    key.converter: org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable: false

    value.converter: org.apache.kafka.connect.json.JsonConverter

    value.converter.schemas.enable: false

But the above setup is not working though my kafka server is up and running producer-consumer example works.

Is the official mongodb plugin (Maven Central Repository Search) appropriate for this ? or do I use debezium mongodb connector ?

If anyone can shed some light on step-by-step guideline with this regard, that would of great help.

Thanks in advance.

Upvotes: 0

Views: 471

Answers (1)

chirag
chirag

Reputation: 57

Since the comment section is getting longer, I'm posting some of the answers to the questions asked in the comments.

Below is my dockerfile:

FROM quay.io/strimzi/kafka:0.31.0-kafka-3.2.1
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

my-plugins folder contains two jar files mongo-kafka-connect-1.8.0.jar and mongodb-driver-core-4.7.1.jar. I'm not sure if I need the core-driver plugin for atlas mongodb, but anyway I have it there.

I changed the producer-consumer example version to the following:

kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.31.0-kafka-3.2.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic

To summerize, my strimzi operator is 0.31.0 and kafka-connect is set to 3.2.1 which is aligned with the compatibility table here under supported versions

Regarding adding tls spec section in kafka-connect, the liveness-probe fails saying: failed to connect to the IP-ADDRESS and the pod keeps restarting.

Below is my tls spec where the cert is present in my kafka server:

tls:
     trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert 
        certificate: ca.crt

I removed the key converter configs as suggested. But what should the group.id in my kafka-connect ?

I also changed storage.topic config to the following:

   offset.storage.topic: mongo-connect-cluster-offsets
   config.storage.topic: mongo-connect-cluster-configs
   status.storage.topic: mongo-connect-cluster-status

I referred this blog for the above settings.

The log from kubectl -n kafka exec -it YOUR-KAFKA-CONNECT-POD -- curl http://localhost:8083/connectors is [] so there is a problem here?

The log from kubectl -n kafka exec -it YOUR-KAFKA-CONNECT-POD -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list:

__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic
mongo-connect-cluster-configs
mongo-connect-cluster-offsets
mongo-connect-cluster-status
my-topic

Below is pod logs for kafka-connect, I will find a way to share the whole log files soon.

kafka-connect

So, where am I going wrong? Also, how do I verify that the data flow is happening the way it is intended to?

Upvotes: 0

Related Questions