scalacode
scalacode

Reputation: 1106

use kafka connect mongoDB debezium souce connector on a remote MSK kafka cluster

I want to read data from MongoDB into Kafka's topic. I managed to get this work locally by using the following connector properties file:

name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017
database.history.kafka.bootstrap.servers=kafka:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1

the connect worker has the following conf:

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000


zookeeper.connect=localhost:2181

rest.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=localhost:9092

This works flawlessly in my local kafka. I want to run it on a remote MSK Kafka cluster. As there is no built-in support of new kafka connect plugins within kafka MSK , I am facing difficulties to make my kafka connect source mongo plugin works, to export the connector from my local machine , I brought the following modifications: At the connector properties level :

name=mongodb-source-connectorszes
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=test/localhost:27017  #keeping the same local mongo
    database.history.kafka.bootstrap.servers=remote-msk-kakfa-brokers:9092
    mongodb.name=mongo_conn
    database.whitelist=test
    initial.sync.max.threads=1
    tasks.max=1

at the connect worker level, I brought the following modifications:

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000


zookeeper.connect=remote-msk-kakfa-zookeeper:9092:2181

rest.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=remote-msk-kakfa-brokers:9092:9092

but seems that this is not enough as I am getting the following error:

[2020-01-31 11:58:01,619] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 118 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)
[2020-01-31 11:58:01,731] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 119 : {mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1031)

Usually, I manage to request the Kafka MSK cluster from my local machine ( via the use of a VPN ,and sshuttle to EC2 instance) . for example, to list topics in the remote kafka msk cluster. I just have to do:

bin/kafka-topics.sh --list --zookeeper  remote-zookeeper-server:2181

by going to my local kafka installation folder.

and this cmmand works perfectly , without changing server.properties in my local machine. Any idea how to solve this in order to export the kafka Debezium mongo Source to kafka MSK cluster.

Upvotes: 1

Views: 1347

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191874

It's recommended to use connect-distributed script and properties for running Connect/Debezium

Anything that says zookeeper.connect should be removed (only Kafka brokers use that). Anything that says bootstrap servers should point at the address MSK gives you.

If you're getting connection errors, make sure you check firewall / VPC settings

Upvotes: 1

Related Questions