Reputation: 1106
I want to read data from MongoDB into Kafka's topic. I managed to get this work locally by using the following connector properties file:
name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017
database.history.kafka.bootstrap.servers=kafka:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1
the connect worker has the following conf:
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
zookeeper.connect=localhost:2181
rest.port=18083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=localhost:9092
This works flawlessly in my local kafka. I want to run it on a remote MSK Kafka cluster. As there is no built-in support of new kafka connect plugins within kafka MSK , I am facing difficulties to make my kafka connect source mongo plugin works, to export the connector from my local machine , I brought the following modifications: At the connector properties level :
name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017 #keeping the same local mongo
database.history.kafka.bootstrap.servers=remote-msk-kakfa-brokers:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1
at the connect worker level, I brought the following modifications:
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
zookeeper.connect=remote-msk-kakfa-zookeeper:9092:2181
rest.port=18083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=remote-msk-kakfa-brokers:9092:9092
but seems that this is not enough as I am getting the following error:
[2020-01-31 11:58:01,619] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 118 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)
[2020-01-31 11:58:01,731] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 119 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)
Usually, I manage to request the Kafka MSK cluster from my local machine ( via the use of a VPN ,and sshuttle to EC2 instance) . for example, to list topics in the remote kafka msk cluster. I just have to do:
bin/kafka-topics.sh --list --zookeeper remote-zookeeper-server:2181
by going to my local kafka installation folder.
and this cmmand works perfectly , without changing server.properties in my local machine. Any idea how to solve this in order to export the kafka Debezium mongo Source to kafka MSK cluster.
Upvotes: 1
Views: 1347
Reputation: 191874
It's recommended to use connect-distributed script and properties for running Connect/Debezium
Anything that says zookeeper.connect should be removed (only Kafka brokers use that). Anything that says bootstrap servers should point at the address MSK gives you.
If you're getting connection errors, make sure you check firewall / VPC settings
Upvotes: 1