Reputation: 11
I am using confluent kafka connector. I want to insert data into a specific table, TB_TEST_KAFKA, in the sink connector. I have already created the table.
auto.create=false.
I don't know a properties table name key of the sink connector. I'm trying to insert into specific table, TB_TEST_KAFKA,
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
errors.log.include.messages=true
dialect.name=SqlServerDatabaseDialect
connection.password=####
tasks.max=1
topics=test-topic
auto.evolve=false
connection.user=kafkauser
auto.create=false
connection.url=jdbc:sqlserver://####:1433;databaseName=TEST
errors.log.enable=true
insert.mode=insert
db.name=TB_TEST_KAFKA
But error
Caused by: org.apache.kafka.connect.errors.ConnectException: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:116)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:68)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
Is there any way to put it into the table??
Upvotes: 1
Views: 1971
Reputation: 567
You can implement TableNamingStrategy and override resolveTableName() method.
table.naming.strategy
io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy
Specifies the fully-qualified class name of a TableNamingStrategy implementation that the connector uses to resolve table names from incoming event topic names.
The default behavior is to:
Replace the ${topic} placeholder in the table.name.format configuration property with the event’s topic.
Sanitize the table name by replacing dots (.) with underscores (_).
Upvotes: 0
Reputation: 4375
JDBC Sink connector use the topic name as the basis for the naming of the table that it populates. You can use the RegExRouter
to customize the name of the table (add these lines to your connector properties):
transforms=renameTopic
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=test-topic
transforms.renameTopic.replacement=TB_TEST_KAFKA
Upvotes: 2