Reputation: 319
Hi I am using debezium to capture changes in Mongo and push them into mysql I am using the following link https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt where I am replacing the end postgres db to mysql database , but I am unable to do so.
This is my revised jdbc-sink.json, where I am using mysql url to connect.
{
"name" : "jdbc-sink",
"config" : {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max" : "1",
"topics" : "customers",
"connection.url" : "jdbc:mysql://localhost:3306/inventorydb?user=user&password=password",
"auto.create" : "true",
"auto.evolve" : "true",
"insert.mode" : "upsert",
"delete.enabled": "true",
"pk.fields" : "id",
"pk.mode": "record_key",
"transforms": "mongoflatten",
"transforms.mongoflatten.type" : "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.mongoflatten.drop.tombstones": "false"
}
}
But I am getting the following error while running
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
HTTP/1.1 500 Internal Server Error Date: Wed, 06 Nov 2019 08:13:39 GMT Content-Type: application/json Content-Length: 3404 Server: Jetty(9.4.18.v20190429)
{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{klass=class io.debezium.connector.mongodb.MongoDbConnector, name='io.debezium.connector.mongodb.MongoDbConnector', version='1.0.0-SNAPSHOT', encodedVersion=1.0.0-SNAPSHOT, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-mongodb/'}, PluginDesc{klass=class io.debezium.connector.mysql.MySqlConnector, name='io.debezium.connector.mysql.MySqlConnector', version='1.0.0-SNAPSHOT', encodedVersion=1.0.0-SNAPSHOT, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-mysql/'}, PluginDesc{klass=class io.debezium.connector.oracle.OracleConnector, name='io.debezium.connector.oracle.OracleConnector', version='1.0.0-SNAPSHOT', encodedVersion=1.0.0-SNAPSHOT, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-oracle/'}, PluginDesc{klass=class io.debezium.connector.postgresql.PostgresConnector, name='io.debezium.connector.postgresql.PostgresConnector', version='1.0.0-SNAPSHOT', encodedVersion=1.0.0-SNAPSHOT, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-postgres/'}, PluginDesc{klass=class io.debezium.connector.sqlserver.SqlServerConnector, name='io.debezium.connector.sqlserver.SqlServerConnector', version='1.0.0-SNAPSHOT', encodedVersion=1.0.0-SNAPSHOT, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-sqlserver/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.3.0', encodedVersion=2.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.3.0', encodedVersion=2.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.3.0', encodedVersion=2.3.0, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.3.0', encodedVersion=2.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.3.0', encodedVersion=2.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.3.0', encodedVersion=2.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.3.0', encodedVersion=2.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.3.0', encodedVersion=2.3.0, type=source, typeName='source', location='classpath'}"}
I understand that some it is unable to find io.confluent.connect.jdbc.JdbcSinkConnector but how should I / and where should I keep such a jar.
Thanks
Upvotes: 0
Views: 1285
Reputation: 77
Download the JAR using the following command and keep it in the plugins directory:
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.0.0/kafka-connect-jdbc-10.0.0.jar
Upvotes: 0
Reputation: 1976
You have not provided the sink connector in the Kafka Connect, please see that command docker-compose up --build -d
is used to start that builds new Connect image with JDBC sink connector baked in https://github.com/debezium/debezium-examples/blob/master/unwrap-mongodb-smt/debezium-jdbc/Dockerfile#L10
Upvotes: 0