Reputation: 57
I am new kafka space and I have setup Strimzi cluster operator, Kafka bootstrap server, entity operator, and kafka connect in Kubernetes following the below guidelines:
https://strimzi.io/docs/operators/latest/deploying.html
How do I setup kafka mongo sink connector for strimzi kafka connect cluster ?
I have the official mongodb connector plugin. Can I use this plugin to connect to atlas mongodb ?
Most of the forums have explanation on confluent kafka but not strimzi kafka.
Below is my kafka connect config:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-mongo-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: STRIMZI KAFKA CONNECT IMAGE WITH MONGODB PLUGIN
version: 3.2.1
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
logging:
type: inline
loggers:
connect.root.logger.level: "INFO"
config:
group.id: my-cluster
offset.storage.topic: mongo-connect-cluster-offsets
config.storage.topic: mongo-connect-cluster-configs
status.storage.topic: mongo-connect-cluster-status
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
Below is my sink connector config:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mongodb-sink-connector
labels:
strimzi.io/cluster: my-cluster
spec:
class: com.mongodb.kafka.connect.MongoSinkConnector
tasksMax: 2
config:
topics: my-topic
connection.uri: "MONGO ATLAS CONNECTION STRING"
database: my_database
collection: my_collection
post.processor.chain: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
But the above setup is not working though my kafka server is up and running producer-consumer example works.
Is the official mongodb plugin (Maven Central Repository Search) appropriate for this ? or do I use debezium mongodb connector ?
If anyone can shed some light on step-by-step guideline with this regard, that would of great help.
Thanks in advance.
Upvotes: 0
Views: 471
Reputation: 57
Since the comment section is getting longer, I'm posting some of the answers to the questions asked in the comments.
Below is my dockerfile:
FROM quay.io/strimzi/kafka:0.31.0-kafka-3.2.1
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
my-plugins
folder contains two jar files mongo-kafka-connect-1.8.0.jar
and mongodb-driver-core-4.7.1.jar
. I'm not sure if I need the core-driver plugin for atlas mongodb, but anyway I have it there.
I changed the producer-consumer example version to the following:
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.31.0-kafka-3.2.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic
To summerize, my strimzi operator is 0.31.0
and kafka-connect is set to 3.2.1
which is aligned with the compatibility table here under supported versions
Regarding adding tls
spec
section in kafka-connect, the liveness-probe fails saying: failed to connect to the IP-ADDRESS
and the pod keeps restarting.
Below is my tls spec where the cert is present in my kafka server:
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
I removed the key converter configs as suggested. But what should the group.id
in my kafka-connect ?
I also changed storage.topic
config to the following:
offset.storage.topic: mongo-connect-cluster-offsets
config.storage.topic: mongo-connect-cluster-configs
status.storage.topic: mongo-connect-cluster-status
I referred this blog for the above settings.
The log from kubectl -n kafka exec -it YOUR-KAFKA-CONNECT-POD -- curl http://localhost:8083/connectors
is []
so there is a problem here?
The log from kubectl -n kafka exec -it YOUR-KAFKA-CONNECT-POD -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list
:
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic
mongo-connect-cluster-configs
mongo-connect-cluster-offsets
mongo-connect-cluster-status
my-topic
Below is pod logs for kafka-connect, I will find a way to share the whole log files soon.
So, where am I going wrong? Also, how do I verify that the data flow is happening the way it is intended to?
Upvotes: 0